summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPrzemyslaw Wasala <przemyslaw.wasala@nokia.com>2018-09-18 06:53:20 +0000
committerGerrit Code Review <gerrit@onap.org>2018-09-18 06:53:20 +0000
commitce7df378e41e29306d8040a307dc58b3a988daef (patch)
tree6c7873ac07319ead3119b3402530873d5295b9ef
parentdb0783dbb093ea20a9e5c0eb0ac266e72e814fe3 (diff)
parenteb766269ca42c91a985797c15fdaa9de255b904e (diff)
Merge "Align with latest HV-VES proto definition"
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt9
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt32
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt15
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt4
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt22
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt6
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt88
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt27
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt10
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt19
-rw-r--r--hv-collector-domain/pom.xml2
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt35
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt17
-rw-r--r--hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto54
-rw-r--r--hv-collector-domain/src/main/proto/VesEvent-v5.proto88
-rw-r--r--hv-collector-domain/src/main/proto/event/VesEvent.proto74
-rw-r--r--hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto43
-rw-r--r--hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto104
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt1
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt9
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt35
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt2
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt2
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt13
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt40
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt7
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt48
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt45
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt140
36 files changed, 535 insertions, 475 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index 8affa0b1..a4a4374c 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
internal object MessageValidator {
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index a7780109..1d43588f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -23,7 +23,7 @@ import arrow.core.Try
import arrow.core.Option
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventOuterClass.VesEvent
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index d8ea45d6..d08ad9e9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
@@ -116,7 +115,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
for (route in routing) {
val routeObj = route.asJsonObject()
defineRoute {
- fromDomain(forNumber(routeObj.getInt("fromDomain")))
+ fromDomain(routeObj.getString("fromDomain"))
toTopic(routeObj.getString("toTopic"))
withFixedPartitioning()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index b611e9aa..a0c22418 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index a00a02d2..18191952 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -24,7 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index 03996fd5..f5bfcce1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.impl.MessageValidator
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index e9cd5f3f..a42b982f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -20,8 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import arrow.core.Option
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
data class Routing(val routes: List<Route>) {
@@ -29,7 +28,7 @@ data class Routing(val routes: List<Route>) {
Option.fromNullable(routes.find { it.applies(commonHeader) })
}
-data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
@@ -63,11 +62,11 @@ class RoutingBuilder {
class RouteBuilder {
- private lateinit var domain: Domain
+ private lateinit var domain: String
private lateinit var targetTopic: String
private lateinit var partitioning: (CommonEventHeader) -> Int
- fun fromDomain(domain: Domain) {
+ fun fromDomain(domain: String) {
this.domain = domain
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index 213f4544..443dfa2f 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -25,13 +25,14 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
+
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.getDefaultInstance
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.newBuilder
internal object MessageValidatorTest : Spek({
@@ -46,8 +47,7 @@ internal object MessageValidatorTest : Spek({
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
}
- Domain.values()
- .filter { (it != Domain.UNRECOGNIZED && it != Domain.DOMAIN_UNDEFINED) }
+ VesEventDomain.values()
.forEach { domain ->
it("should accept message with $domain domain") {
val header = commonHeader(domain)
@@ -65,26 +65,8 @@ internal object MessageValidatorTest : Spek({
}
}
-
- val domainTestCases = mapOf(
- Domain.DOMAIN_UNDEFINED to false,
- Domain.FAULT to true
- )
-
- domainTestCases.forEach { value, expectedResult ->
- on("ves hv message including header with domain $value") {
- val commonEventHeader = commonHeader(value)
- val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
-
- it("should resolve validation result") {
- assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
- .isEqualTo(expectedResult)
- }
- }
- }
-
val priorityTestCases = mapOf(
- Priority.PRIORITY_UNDEFINED to false,
+ Priority.PRIORITY_NOT_PROVIDED to false,
Priority.HIGH to true
)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 91fa7c19..c2fe1cc5 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -27,11 +27,14 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -42,13 +45,13 @@ object RouterTest : Spek({
val config = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
+ fromDomain(HVMEAS.name)
toTopic("ves_rtpm")
withFixedPartitioning(2)
}
defineRoute {
- fromDomain(Domain.SYSLOG)
+ fromDomain(SYSLOG.name)
toTopic("ves_trace")
withFixedPartitioning()
}
@@ -56,7 +59,7 @@ object RouterTest : Spek({
val cut = Router(config)
on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(Domain.HVRANMEAS), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HVMEAS), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -77,7 +80,7 @@ object RouterTest : Spek({
}
on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(Domain.SYSLOG), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -98,7 +101,7 @@ object RouterTest : Spek({
}
on("message with unknown route") {
- val message = VesMessage(commonHeader(Domain.HEARTBEAT), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should not have route available") {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index a7d3971e..8950a557 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -26,10 +26,10 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -41,7 +41,7 @@ internal object VesDecoderTest : Spek({
val cut = VesDecoder()
on("ves hv message bytes") {
- val commonHeader = commonHeader(Domain.HEARTBEAT)
+ val commonHeader = commonHeader(HEARTBEAT)
val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
it("should decode only header and pass it on along with raw message") {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 59224cca..f21a2ecf 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,9 +28,11 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -62,14 +64,14 @@ internal object ConsulConfigurationProviderTest : Spek({
StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
- assertEquals("kafka:9093", it.kafkaBootstrapServers)
+ assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
val route1 = it.routing.routes[0]
- assertEquals(Domain.FAULT, route1.domain)
+ assertEquals(FAULT.name, route1.domain)
assertEquals("test-topic-1", route1.targetTopic)
val route2 = it.routing.routes[1]
- assertEquals(Domain.HEARTBEAT, route2.domain)
+ assertEquals(HEARTBEAT.name, route2.domain)
assertEquals("test-topic-2", route2.targetTopic)
}.verifyComplete()
@@ -95,7 +97,7 @@ internal object ConsulConfigurationProviderTest : Spek({
.verifyErrorMessage("Test exception")
}
- it("should update the health state"){
+ it("should update the health state") {
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
.expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
@@ -128,21 +130,23 @@ private fun constructConsulConfigProvider(url: String,
}
+const val kafkaAddress = "message-router-kafka"
+
fun constructConsulResponse(): String {
val config = """{
- "dmaap.kafkaBootstrapServers": "kafka:9093",
+ "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
"collector.routing": [
{
- "fromDomain": 1,
+ "fromDomain": "FAULT",
"toTopic": "test-topic-1"
},
{
- "fromDomain": 2,
+ "fromDomain": "HEARTBEAT",
"toTopic": "test-topic-2"
}
]
-}"""
+ }"""
val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index ba29844a..8e6103c9 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
@@ -62,7 +62,7 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
val params = MessageParameters(
- commonEventHeader = commonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVMEAS),
messageType = VALID,
amount = numMessages
)
@@ -92,7 +92,7 @@ object PerformanceSpecification : Spek({
val timeout = Duration.ofSeconds(30)
val params = MessageParameters(
- commonEventHeader = commonHeader(HVRANMEAS),
+ commonEventHeader = commonHeader(HVMEAS),
messageType = VALID,
amount = numMessages
)
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index a9f3e9a4..60e10ee0 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,9 +24,13 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
-import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
@@ -39,7 +43,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.Flux
import java.time.Duration
@@ -54,8 +58,8 @@ object VesHvSpecification : Spek({
it("should handle multiple HV RAN events") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HVRANMEAS)
+ vesWireFrameMessage(HVMEAS),
+ vesWireFrameMessage(HVMEAS)
)
assertThat(messages)
@@ -65,8 +69,8 @@ object VesHvSpecification : Spek({
it("should not handle messages received from client after end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
- val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
+ val anotherValidMessage = vesWireFrameMessage(HVMEAS)
val endOfTransmissionMessage = endOfTransmissionWireMessage()
val handledEvents = sut.handleConnection(sink,
@@ -91,23 +95,19 @@ object VesHvSpecification : Spek({
describe("Memory management") {
it("should release memory for each handled and dropped message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
- val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS)
+ val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
- sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload)
+ sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload)
assertThat(handledEvents).hasSize(1)
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
.isEqualTo(expectedRefCnt)
- assertThat(msgWithInvalidDomain.refCnt())
- .describedAs("message with invalid domain should be released")
- .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
.isEqualTo(expectedRefCnt)
@@ -118,7 +118,7 @@ object VesHvSpecification : Spek({
it("should release memory for end-of-transmission message") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val endOfTransmissionMessage = endOfTransmissionWireMessage()
val expectedRefCnt = 0
@@ -138,7 +138,7 @@ object VesHvSpecification : Spek({
it("should release memory for each message with invalid payload") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload()
val expectedRefCnt = 0
@@ -157,7 +157,7 @@ object VesHvSpecification : Spek({
it("should release memory for each message with garbage frame") {
val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(Domain.HVRANMEAS)
+ val validMessage = vesWireFrameMessage(HVMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
@@ -179,11 +179,11 @@ object VesHvSpecification : Spek({
it("should direct message to a topic by means of routing configuration") {
val (sut, sink) = vesHvWithStoringSink()
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
@@ -193,17 +193,17 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration)
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HEARTBEAT),
- vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING))
+ vesWireFrameMessage(HVMEAS),
+ vesWireFrameMessage(HEARTBEAT),
+ vesWireFrameMessage(MEASUREMENT))
assertThat(messages).describedAs("number of routed messages").hasSize(3)
assertThat(messages[0].topic).describedAs("first message topic")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(messages[1].topic).describedAs("second message topic")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(messages[2].topic).describedAs("last message topic")
.isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
@@ -212,14 +212,14 @@ object VesHvSpecification : Spek({
it("should drop message if route was not found") {
val (sut, sink) = vesHvWithStoringSink()
val messages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.OTHER, "first"),
- vesWireFrameMessage(Domain.HVRANMEAS, "second"),
- vesWireFrameMessage(Domain.HEARTBEAT, "third"))
+ vesWireFrameMessage(OTHER, "first"),
+ vesWireFrameMessage(HVMEAS, "second"),
+ vesWireFrameMessage(HEARTBEAT, "third"))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
- assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC)
assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
}
}
@@ -253,41 +253,41 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(configurationWithoutRouting)
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).isEmpty()
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messagesAfterUpdate).hasSize(1)
val message = messagesAfterUpdate[0]
assertThat(message.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(message.partition).describedAs("routed message partition")
.isEqualTo(0)
}
it("should change domain routing") {
- val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messages).hasSize(1)
val firstMessage = messages[0]
assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration")
- .isEqualTo(HVRANMEAS_TOPIC)
+ .isEqualTo(HVMEAS_TOPIC)
assertThat(firstMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
- val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
assertThat(messagesAfterUpdate).hasSize(2)
val secondMessage = messagesAfterUpdate[1]
assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change")
- .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC)
+ .isEqualTo(ALTERNATE_HVMEAS_TOPIC)
assertThat(secondMessage.partition).describedAs("routed message partition")
.isEqualTo(0)
}
@@ -302,13 +302,13 @@ object VesHvSpecification : Spek({
sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting)
}
}.doOnNext {
- sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS))
+ sut.handleConnection(sink, vesWireFrameMessage(HVMEAS))
}.then().block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
assertThat(messages.size).isEqualTo(messagesAmount)
assertThat(messagesForEachTopic)
@@ -329,14 +329,14 @@ object VesHvSpecification : Spek({
println("config changed")
}
}
- .map { vesWireFrameMessage(Domain.HVRANMEAS) }
+ .map { vesWireFrameMessage(HVMEAS) }
sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout)
val messages = sink.sentMessages
- val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC }
- val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC }
+ val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC }
+ val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC }
assertThat(messages.size).isEqualTo(messageStreamSize)
assertThat(firstTopicMessagesCount)
@@ -373,9 +373,9 @@ object VesHvSpecification : Spek({
val (sut, sink) = vesHvWithStoringSink()
val handledMessages = sut.handleConnection(sink,
- vesWireFrameMessage(Domain.HVRANMEAS, "first"),
- vesMessageWithTooBigPayload(Domain.HVRANMEAS),
- vesWireFrameMessage(Domain.HVRANMEAS))
+ vesWireFrameMessage(HVMEAS, "first"),
+ vesMessageWithTooBigPayload(HVMEAS),
+ vesWireFrameMessage(HVMEAS))
assertThat(handledMessages).hasSize(1)
assertThat(handledMessages.first().message.header.eventId).isEqualTo("first")
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
index ebeaa69e..688f2758 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -20,24 +20,27 @@
package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
import reactor.retry.RetryExhaustedException
-const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+const val HVMEAS_TOPIC = "ves_hvRanMeas"
const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling"
-const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas"
+const val ALTERNATE_HVMEAS_TOPIC = "ves_alternateHvRanMeas"
val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
}.build()
@@ -47,17 +50,17 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
defineRoute {
- fromDomain(Domain.HEARTBEAT)
- toTopic(HVRANMEAS_TOPIC)
+ fromDomain(HEARTBEAT.name)
+ toTopic(HVMEAS_TOPIC)
withFixedPartitioning()
}
defineRoute {
- fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING)
+ fromDomain(MEASUREMENT.name)
toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC)
withFixedPartitioning()
}
@@ -69,8 +72,8 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu
kafkaBootstrapServers = "localhost:9969",
routing = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
- toTopic(ALTERNATE_HVRANMEAS_TOPIC)
+ fromDomain(HVMEAS.name)
+ toTopic(ALTERNATE_HVMEAS_TOPIC)
withFixedPartitioning()
}
}.build()
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 354edaeb..51f94cc4 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -30,7 +30,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass
import java.io.InputStream
import javax.json.Json
@@ -68,22 +68,22 @@ class MessageStreamValidation(
parameters.all { it.messageType == MessageType.FIXED_PAYLOAD }
- private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean {
+ private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, expected: List<VesEventOuterClass.VesEvent>): Boolean {
val consumedHeaders = actual.map { it.commonEventHeader }
val generatedHeaders = expected.map { it.commonEventHeader }
return generatedHeaders == consumedHeaders
}
- private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> =
+ private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
messageGenerator.createMessageFlux(parameters)
.map(PayloadWireFrameMessage::payload)
.map(ByteData::unsafeAsArray)
- .map(VesEventV5.VesEvent::parseFrom)
+ .map(VesEventOuterClass.VesEvent::parseFrom)
.collectList()
.asIo()
private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) =
- consumedMessages.map(VesEventV5.VesEvent::parseFrom)
+ consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom)
}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
index c0ba5812..017360bb 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -36,8 +36,8 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.mockito.ArgumentMatchers.anySet
import org.mockito.Mockito
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import java.util.concurrent.ConcurrentLinkedQueue
/**
@@ -179,6 +179,6 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
return VesEvent.newBuilder()
.setCommonEventHeader(CommonEventHeader.newBuilder()
.setEventId(eventId))
- .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
.build()
}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index 2932367b..beef26b6 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -20,17 +20,10 @@
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
import arrow.core.Either
-import arrow.core.Left
-import arrow.core.None
import arrow.core.Right
-import arrow.core.Some
-import arrow.effects.IO
-import javax.json.stream.JsonParsingException
import com.google.protobuf.ByteString
import com.nhaarman.mockito_kotlin.any
import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.never
-import com.nhaarman.mockito_kotlin.verify
import com.nhaarman.mockito_kotlin.whenever
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
@@ -38,19 +31,15 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.mockito.ArgumentMatchers.anyList
-import org.mockito.ArgumentMatchers.anySet
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
-import java.util.concurrent.ConcurrentLinkedQueue
-import javax.json.Json
-import javax.json.JsonArray
-import javax.json.JsonValue
+import javax.json.stream.JsonParsingException
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -216,7 +205,7 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
return VesEvent.newBuilder()
.setCommonEventHeader(CommonEventHeader.newBuilder()
.setEventId(eventId))
- .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
.build()
}
diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml
index eb2a7582..8f6ec874 100644
--- a/hv-collector-domain/pom.xml
+++ b/hv-collector-domain/pom.xml
@@ -74,7 +74,7 @@
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
<inputDirectories>
- <include>${project.basedir}/src/main/proto</include>
+ <include>${project.basedir}/src/main/proto/event</include>
</inputDirectories>
<outputTargets>
<outputTarget>
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt
new file mode 100644
index 00000000..3e99cdc8
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+enum class VesEventDomain {
+ FAULT,
+ HEARTBEAT,
+ MEASUREMENT,
+ MOBILE_FLOW,
+ OTHER,
+ PNFREGISTRATION,
+ SIP_SIGNALING,
+ STATE_CHANGE,
+ SYSLOG,
+ THRESHOLD_CROSSING_ALERT,
+ VOICE_QUALITY,
+ HVMEAS
+}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt
index 91c75459..339a652c 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt
@@ -19,17 +19,18 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass
val headerRequiredFieldDescriptors = listOf(
"version",
- "eventName",
"domain",
- "eventId",
- "sourceName",
- "reportingEntityName",
+ "sequence",
"priority",
- "startEpochMicrosec",
+ "eventId",
+ "eventName",
"lastEpochMicrosec",
- "sequence")
- .map { fieldName -> VesEventV5.VesEvent.CommonEventHeader.getDescriptor().findFieldByName(fieldName) } \ No newline at end of file
+ "startEpochMicrosec",
+ "reportingEntityName",
+ "sourceName",
+ "vesEventListenerVersion")
+ .map { fieldName -> VesEventOuterClass.CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
diff --git a/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto b/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto
deleted file mode 100644
index 5121f0eb..00000000
--- a/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-syntax = "proto3";
-package org.onap.ves;
-
-// Definition for RTPM
-
-message HVRanMeasFields {
- message HVRanMeasPayload {
- message PMObject {
- message HVRanMeas {
- uint32 measurement_id = 1;
- repeated uint32 counter_subid = 2;
- repeated sint64 counter_value = 3;
- repeated uint32 missing_counter_subid = 4;
- bool suspectFlagIncomplete = 5; // (some is data missing due to internal error)
- bool suspectFlagOutOfSync = 6; // (source time not aligned)
- }
-
- string uri = 1; // monitored object URI
- repeated HVRanMeas hvRanMeas = 2; // performance counters grouped by measurement types
- }
- repeated PMObject pmObject = 1;
- }
-
- message AdditionalField {
- string name = 1;
- string value = 2;
- }
-
- string hvRanMeasFieldsVersion = 1; // version of HVRanMeasFields message
- uint32 period_ms = 2; // period configured for reporting the data in milliseconds
- string timezone = 3; // timezone of Network Function sending the data
- string pmDictionaryVsn = 4; // vendor name + schema version E.g. NOKIA_LN7.0, uniquely identify the relevant PM dictionary
- HVRanMeasPayload hvRanMeasPayload = 5; // objects being monitored
- repeated AdditionalField additionalFields = 6; // array of name-value pairs if needed
-} \ No newline at end of file
diff --git a/hv-collector-domain/src/main/proto/VesEvent-v5.proto b/hv-collector-domain/src/main/proto/VesEvent-v5.proto
deleted file mode 100644
index 340133b2..00000000
--- a/hv-collector-domain/src/main/proto/VesEvent-v5.proto
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-syntax = "proto3";
-package org.onap.ves;
-
-message VesEvent {
-
- // VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
- // Source: https://git.opnfv.org/ves/tree/tests/docs/ves_data_model.json
- // 2017-05-13 Align with VES 5.0 schema.
- // blob: ca948ff67e8a2de4e2a47cffc4d4d2893170ab76
-
- message CommonEventHeader {
- string version = 1; // required, "version of the event header"
- enum Domain {
- DOMAIN_UNDEFINED = 0;
- FAULT = 1;
- HEARTBEAT = 2;
- MEASUREMENTS_FOR_VF_SCALING = 3;
- MOBILE_FLOW = 4;
- SIP_SIGNALING = 5;
- STATE_CHANGE = 6;
- SYSLOG = 7;
- THRESHOLD_CROSSING_ALERT = 8;
- VOICE_QUALITY = 9;
- OTHER = 10;
- HVRANMEAS = 11;
- }
- Domain domain = 2; // required, "the eventing domain associated with the event" [map to string]
-
- uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
-
- enum Priority {
- PRIORITY_UNDEFINED = 0;
- HIGH = 1;
- MEDIUM = 2;
- NORMAL = 3;
- LOW = 4;
- }
- Priority priority = 4; // required, "processing priority"
-
- string eventId = 5; // required, "event key that is unique to the event source"
- string eventName = 6; // required, "unique event name"
- string eventType = 7; // "for example - applicationVnf, guestOS, hostOS, platform"
-
- uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
- uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
-
- string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
- string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
-
- string reportingEntityId = 12; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
- bytes reportingEntityName = 13; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName"
- bytes sourceId = 14; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
- string sourceName = 15; // required, "name of the entity experiencing the event issue"
-
- reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
- reserved 100;
- }
-
- CommonEventHeader commonEventHeader = 1;
-
- oneof eventFields // required, payload, each high-volume domain has its specific GPB schema
- {
- bytes hvRanMeasFields = 2; // if domain==HVRANMEAS, GPB schema: HVRanMeasFields.proto
- }
-}
-
-message VesEventList {
- repeated VesEvent vesEvent = 1;
-}
diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto
new file mode 100644
index 00000000..54a6d149
--- /dev/null
+++ b/hv-collector-domain/src/main/proto/event/VesEvent.proto
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+
+message VesEvent {
+ CommonEventHeader commonEventHeader = 1; // required
+
+ oneof eventFields // required, payload
+ {
+ // each new high-volume domain can add an entry for its own GPB message
+ // the field can be opaque (bytes) to allow decoding the payload in a separate step
+ bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields
+ }
+}
+
+// VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
+// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain
+
+message CommonEventHeader {
+ string version = 1; // required, "version of the gpb common event header"
+ string domain = 2; // required, "the eventing domain associated with the event", allowed values:
+ // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
+ // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
+
+ uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
+
+ enum Priority {
+ PRIORITY_NOT_PROVIDED = 0;
+ HIGH = 1;
+ MEDIUM = 2;
+ NORMAL = 3;
+ LOW = 4;
+ }
+ Priority priority = 4; // required, "processing priority"
+
+ string eventId = 5; // required, "event key that is unique to the event source"
+ string eventName = 6; // required, "unique event name"
+ string eventType = 7; // "for example - guest05, platform"
+
+ uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+ uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+
+ string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
+ string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
+ string nfVendorName = 12; // " Vendor Name providing the nf "
+
+ bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
+ string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
+ bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
+ string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry"
+ string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device"
+ string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
+
+ reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
+ reserved 100;
+}
diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
new file mode 100644
index 00000000..9a8582d5
--- /dev/null
+++ b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+import "MeasDataCollection.proto"; // for 3GPP PM format
+
+message HVMeasFields
+{
+ string hvMeasFieldsVersion = 1;
+ measDataCollection.MeasDataCollection measDataCollection = 2;
+ // From 3GPP TS 28.550
+ // Informative: mapping between similar header fields (format may be different)
+ // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader
+ // senderName sourceName
+ // senderType nfNamingCode + nfcNamingCode
+ // vendorName nfVendorName
+ // collectionBeginTime startEpochMicrosec
+ // timestamp lastEpochMicrosec
+ repeated HashMap eventAddlFlds = 3; // optional per-event data
+}
+
+message HashMap
+{
+ string name = 1;
+ string value = 2;
+} \ No newline at end of file
diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
new file mode 100644
index 00000000..472dcc43
--- /dev/null
+++ b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
@@ -0,0 +1,104 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package measDataCollection;
+
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).
+// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.
+// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.
+// Differences/additions to 3GPP TS 28.550 are marked with "%%".
+
+message MeasDataCollection // top-level message
+{
+ MeasHeader measHeader = 1;
+ repeated MeasData measData = 2; // %%: use a single instance for RTPM
+ MeasFooter measFooter = 3;
+}
+
+message MeasHeader
+{
+ string streamFormatVersion = 1;
+ string senderName = 2;
+ string senderType = 3;
+ string vendorName = 4;
+ string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)
+}
+
+message MeasData
+{
+ string measuredEntityId = 1; // DN as per 3GPP TS 32.300
+ string measuredEntityUserName = 2; // network function User Name
+ string measuredEntitySoftwareVersion = 3;
+ uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event)
+ repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
+ repeated MeasInfo measInfo = 6;
+}
+
+
+message MeasInfo
+{
+ oneof MeasInfoId { // measurement group identifier
+ uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
+ string measInfoId = 2; // identifier as string (more generic)
+ }
+
+ oneof MeasTypes { // measurement identifiers associated with the measurement results
+ IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
+ SMeasTypes measTypes = 4; // identifiers as strings (more generic)
+ }
+ // Needed only because GPB does not support repeated fields directly inside 'oneof'
+ message IMeasTypes { repeated uint32 iMeasType = 1; }
+ message SMeasTypes { repeated string measType = 1; }
+
+ string jobIdList = 5;
+ repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups
+}
+
+message MeasValue
+{
+ oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432
+ string measObjInstId = 1; // LDN itself
+ uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
+ }
+ repeated MeasResult measResults = 3;
+ bool suspectFlag = 4;
+ repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data
+}
+
+message MeasResult
+{
+ uint32 p = 1; // Optional index in the MeasTypes array
+ oneof xValue {
+ sint64 iValue = 2;
+ double rValue = 3;
+ bool isNull = 4;
+ }
+}
+
+message MeasFooter
+{
+ string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"
+}
+
+message nameValue // %%: vendor-defined name-value pair
+{
+ string name = 1;
+ string value = 2;
+} \ No newline at end of file
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index fa63c36e..b992d530 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.domain
import arrow.core.Either
import io.netty.buffer.ByteBuf
import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.ObjectAssert
import org.jetbrains.spek.api.Spek
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index c6aa89b2..78042260 100644
--- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
@@ -25,7 +25,10 @@ import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
+
import java.util.UUID.randomUUID
@@ -39,7 +42,7 @@ private fun ByteBuf.writeValidWireFrameHeaders() {
writeByte(0x01) // content type = GPB
}
-fun vesWireFrameMessage(domain: Domain = Domain.OTHER,
+fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
id: String = randomUUID().toString()): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
@@ -70,7 +73,7 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0x01) // content type = GPB
}
-fun vesMessageWithTooBigPayload(domain: Domain = Domain.DOMAIN_UNDEFINED): ByteBuf =
+fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index 6aeb6206..0341c2ff 100644
--- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
@@ -23,27 +23,31 @@ package org.onap.dcae.collectors.veshv.tests.utils
import com.google.protobuf.ByteString
import com.google.protobuf.MessageLite
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.ves.VesEventOuterClass
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
import java.util.UUID.randomUUID
-fun vesEvent(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS,
+fun vesEvent(domain: VesEventDomain = HVMEAS,
id: String = randomUUID().toString(),
hvRanMeasFields: ByteString = ByteString.EMPTY
-): VesEventV5.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields)
+): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields)
-fun vesEvent(commonEventHeader: VesEventV5.VesEvent.CommonEventHeader,
- hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventV5.VesEvent =
- VesEventV5.VesEvent.newBuilder()
+fun vesEvent(commonEventHeader: CommonEventHeader,
+ hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
+ VesEventOuterClass.VesEvent.newBuilder()
.setCommonEventHeader(commonEventHeader)
- .setHvRanMeasFields(hvRanMeasFields)
+ .setHvMeasFields(hvRanMeasFields)
.build()
-fun commonHeader(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS,
+fun commonHeader(domain: VesEventDomain = HVMEAS,
id: String = randomUUID().toString(),
- priority: VesEventV5.VesEvent.CommonEventHeader.Priority = VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL): VesEventV5.VesEvent.CommonEventHeader =
- VesEventV5.VesEvent.CommonEventHeader.newBuilder()
+ priority: Priority = Priority.NORMAL): CommonEventHeader =
+ CommonEventHeader.newBuilder()
.setVersion("sample-version")
- .setDomain(domain)
+ .setDomain(domain.name)
.setSequence(1)
.setPriority(priority)
.setEventId(id)
@@ -53,13 +57,16 @@ fun commonHeader(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEvent
.setLastEpochMicrosec(120034455)
.setNfNamingCode("sample-nf-naming-code")
.setNfcNamingCode("sample-nfc-naming-code")
- .setReportingEntityId("sample-reporting-entity-id")
- .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name"))
+ .setNfVendorName("vendor-name")
+ .setReportingEntityId(ByteString.copyFromUtf8("sample-reporting-entity-id"))
+ .setReportingEntityName("sample-reporting-entity-name")
.setSourceId(ByteString.copyFromUtf8("sample-source-id"))
.setSourceName("sample-source-name")
+ .setTimeZoneOffset("+1")
+ .setVesEventListenerVersion("another-version")
.build()
-fun vesEventBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
+fun vesEventBytes(commonHeader: CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData =
vesEvent(commonHeader, byteString).toByteData()
fun MessageLite.toByteData(): ByteData = ByteData(toByteArray()) \ No newline at end of file
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
index 8d989cc5..047d863c 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
index 768685c1..909db5e4 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt
@@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import arrow.core.Option
import com.google.protobuf.util.JsonFormat
import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
-import org.onap.ves.VesEventV5.VesEvent.*
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import javax.json.JsonObject
/**
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index fec2609e..5d1f56dc 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -31,10 +31,9 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVA
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventOuterClass.VesEvent
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.charset.Charset
@@ -79,10 +78,6 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
}
- private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: HVRanMeasPayload): ByteArray {
- return vesEvent(commonEventHeader, hvRanMeasPayload.toByteString())
- }
-
private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray {
return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray()
}
@@ -90,7 +85,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
VesEvent.newBuilder()
.setCommonEventHeader(commonEventHeader)
- .setHvRanMeasFields(payload)
+ .setHvMeasFields(payload)
.build()
private fun oversizedPayload() =
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
index acdaf19f..ef7eefa6 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt
@@ -20,10 +20,8 @@
package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject
-import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas
import java.util.*
+import kotlin.streams.asSequence
internal class PayloadGenerator {
@@ -32,34 +30,12 @@ internal class PayloadGenerator {
fun generateRawPayload(size: Int): ByteString =
ByteString.copyFrom(ByteArray(size))
- fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload {
- val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject)
- return HVRanMeasPayload.newBuilder()
- .addPmObject(pmObject)
- .build()
- }
+ fun generatePayload(numOfCountMeasurements: Long = 2): ByteString =
+ ByteString.copyFrom(
+ randomGenerator.ints(numOfCountMeasurements, 0, 256)
+ .asSequence()
+ .toString()
+ .toByteArray()
+ )
- private fun generatePmObject(numOfCountPerMeas: Long, numOfMeasPerObject: Int): PMObject {
- val hvRanMeasList = MutableList(numOfMeasPerObject) { generateHvRanMeas(numOfCountPerMeas) }
- val finalUriName = URI_BASE_NAME + randomGenerator.nextInt(UPPER_URI_NUMBER_BOUND)
- return HVRanMeasPayload.PMObject.newBuilder()
- .setUri(finalUriName)
- .addAllHvRanMeas(hvRanMeasList.asIterable())
- .build()
- }
-
- private fun generateHvRanMeas(numOfCountPerMeas: Long): HVRanMeas {
- return HVRanMeasPayload.PMObject.HVRanMeas.newBuilder()
- .setMeasurementId(randomGenerator.nextInt())
- .addAllCounterSubid(Iterable { randomGenerator.ints(numOfCountPerMeas).iterator() })
- .addAllCounterValue(Iterable { randomGenerator.longs(numOfCountPerMeas).iterator() })
- .setSuspectFlagIncomplete(false)
- .setSuspectFlagOutOfSync(false)
- .build()
- }
-
- companion object {
- private const val URI_BASE_NAME = "sample/uri"
- private const val UPPER_URI_NUMBER_BOUND = 10_000
- }
}
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt
index c16459ce..ce394cc1 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt
@@ -27,8 +27,9 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.ves.VesEventV5
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import java.io.ByteArrayInputStream
import javax.json.Json
import kotlin.test.fail
@@ -40,7 +41,7 @@ class CommonEventHeaderParserTest : Spek({
given("valid header in JSON format") {
val commonEventHeader = commonHeader(
- domain = VesEventV5.VesEvent.CommonEventHeader.Domain.STATE_CHANGE,
+ domain = STATE_CHANGE,
id = "sample-event-id")
val json = JsonFormat.printer().print(commonEventHeader).byteInputStream()
@@ -75,7 +76,7 @@ class CommonEventHeaderParserTest : Spek({
}
})
-fun assertFailed(result: Option<VesEventV5.VesEvent.CommonEventHeader>) =
+fun assertFailed(result: Option<CommonEventHeader>) =
result.fold({}, { fail() })
fun jsonObject(json: ByteArrayInputStream) = Json.createReader(json).readObject() \ No newline at end of file
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
index f13a33bf..ea3d094a 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
@@ -30,15 +30,15 @@ import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.ves.VesEventV5.VesEvent
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.test.test
/**
@@ -54,7 +54,7 @@ object MessageGeneratorImplTest : Spek({
val limit = 1000L
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.VALID
)))
.take(limit)
@@ -67,7 +67,7 @@ object MessageGeneratorImplTest : Spek({
it("should create message flux of specified size") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.VALID,
5
)))
@@ -88,7 +88,7 @@ object MessageGeneratorImplTest : Spek({
.assertNext {
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.verifyComplete()
}
@@ -98,7 +98,7 @@ object MessageGeneratorImplTest : Spek({
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.TOO_BIG_PAYLOAD,
1
)))
@@ -106,7 +106,7 @@ object MessageGeneratorImplTest : Spek({
.assertNext {
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
}
.verifyComplete()
}
@@ -115,7 +115,7 @@ object MessageGeneratorImplTest : Spek({
it("should create flux of messages with invalid payload") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.INVALID_GPB_DATA,
1
)))
@@ -133,7 +133,7 @@ object MessageGeneratorImplTest : Spek({
it("should create flux of messages with invalid version") {
generator
.createMessageFlux(listOf(MessageParameters(
- commonHeader(HVRANMEAS),
+ commonHeader(HVMEAS),
MessageType.INVALID_WIRE_FRAME,
1
)))
@@ -141,7 +141,7 @@ object MessageGeneratorImplTest : Spek({
.assertNext {
assertThat(it.isValid()).isFalse()
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR)
}
.verifyComplete()
@@ -160,7 +160,7 @@ object MessageGeneratorImplTest : Spek({
assertThat(it.isValid()).isTrue()
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.verifyComplete()
}
@@ -170,7 +170,7 @@ object MessageGeneratorImplTest : Spek({
it("should create concatenated flux of messages") {
val singleFluxSize = 5L
val messageParameters = listOf(
- MessageParameters(commonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize),
+ MessageParameters(commonHeader(HVMEAS), MessageType.VALID, singleFluxSize),
MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize),
MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize)
)
@@ -178,17 +178,17 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
- assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT)
+ assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name)
}
.expectNextCount(singleFluxSize - 1)
.verifyComplete()
@@ -197,10 +197,10 @@ object MessageGeneratorImplTest : Spek({
}
})
-fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader {
- return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
-}
+fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
+ VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+
+
+fun extractHvRanMeasFields(bytes: ByteData): ByteString =
+ VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields
-fun extractHvRanMeasFields(bytes: ByteData): ByteString {
- return VesEvent.parseFrom(bytes.unsafeAsArray()).hvRanMeasFields
-}
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt
index 3695ca4d..2b41e290 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt
@@ -26,50 +26,27 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
-private const val DEFAULT_MEASUREMENTS_NUMBER = 2
-private const val DEFAULT_COUNTERS_NUMBER = 2
-
-private val uriRegex = """sample/uri(\d+)""".toRegex()
-
object PayloadGeneratorTest : Spek({
given("payload factory object") {
val payloadGenerator = PayloadGenerator()
- on("two generated payloads") {
- val generatedPayload0 = payloadGenerator.generatePayload()
- val generatedPayload1 = payloadGenerator.generatePayload()
- it("URIs should have different names") {
- val matchResult0 = uriRegex.find(generatedPayload0.getPmObject(0).uri)!!.value
- val matchResult1 = uriRegex.find(generatedPayload1.getPmObject(0).uri)!!.value
- assertThat(matchResult0 != matchResult1).isTrue()
- }
- }
+ on("raw payload generation") {
+ val size = 100
+ val generatedPayload = payloadGenerator.generateRawPayload(size)
- on("call with default parameters") {
- val generatedPayload = payloadGenerator.generatePayload()
- it("should contain default numbers of measurements") {
- assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(DEFAULT_MEASUREMENTS_NUMBER)
- }
- it("should contain default numbers of counters in measurement") {
- assertThat(generatedPayload.getPmObject(0).getHvRanMeas(0).counterSubidCount).isEqualTo(DEFAULT_COUNTERS_NUMBER)
+ it("should generate sequence of zeros") {
+ assertThat(generatedPayload.size()).isEqualTo(size)
+ assertThat(generatedPayload.toByteArray()).isEqualTo(ByteArray(size))
}
}
- on("call with specified parameters") {
- val numOfCountPerMeas: Long = 5
- val numOfMeasPerObject = 10
- val generatedPayload = payloadGenerator.generatePayload(numOfCountPerMeas, numOfMeasPerObject)
- it("should contain specified number of measurements") {
- assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(numOfMeasPerObject)
- }
- it("measurement should contain specified number of counters") {
- assertThat(generatedPayload.getPmObject(0).hvRanMeasList
- .filter { numOfCountPerMeas.toInt() == it.counterSubidCount }
- .size)
- .isEqualTo(numOfMeasPerObject)
+ on("two generated payloads") {
+ val generatedPayload0 = payloadGenerator.generatePayload()
+ val generatedPayload1 = payloadGenerator.generatePayload()
+ it("should be different") {
+ assertThat(generatedPayload0 != generatedPayload1).isTrue()
}
-
}
}
})
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt
index 88550603..45e936a6 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt
@@ -21,73 +21,81 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
import javax.json.Json
-private const val validMessageParameters = "[\n" +
- " {\n" +
- " \"commonEventHeader\": {\n" +
- " \"version\": \"sample-version\",\n" +
- " \"domain\": \"HVRANMEAS\",\n" +
- " \"sequence\": 1,\n" +
- " \"priority\": 1,\n" +
- " \"eventId\": \"sample-event-id\",\n" +
- " \"eventName\": \"sample-event-name\",\n" +
- " \"eventType\": \"sample-event-type\",\n" +
- " \"startEpochMicrosec\": 120034455,\n" +
- " \"lastEpochMicrosec\": 120034455,\n" +
- " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
- " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
- " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
- " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
- " \"sourceId\": \"sample-source-id\",\n" +
- " \"sourceName\": \"sample-source-name\"\n" +
- " },\n" +
- " \"messageType\": \"VALID\",\n" +
- " \"messagesAmount\": 25000\n" +
- " },\n" +
- " {\n" +
- " \"commonEventHeader\": {\n" +
- " \"version\": \"sample-version\",\n" +
- " \"domain\": \"HVRANMEAS\",\n" +
- " \"sequence\": 1,\n" +
- " \"priority\": 1,\n" +
- " \"eventId\": \"sample-event-id\",\n" +
- " \"eventName\": \"sample-event-name\",\n" +
- " \"eventType\": \"sample-event-type\",\n" +
- " \"startEpochMicrosec\": 120034455,\n" +
- " \"lastEpochMicrosec\": 120034455,\n" +
- " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
- " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
- " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
- " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
- " \"sourceId\": \"sample-source-id\",\n" +
- " \"sourceName\": \"sample-source-name\"\n" +
- " },\n" +
- " \"messageType\": \"TOO_BIG_PAYLOAD\",\n" +
- " \"messagesAmount\": 100\n" +
- " }\n" +
- "]"
+private const val validMessageParameters =
+"""[
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "HVMEAS",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "VALID",
+ "messagesAmount": 25000
+ },
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "HVMEAS",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messageType": "TOO_BIG_PAYLOAD",
+ "messagesAmount": 100
+ }
+ ]
+"""
-private const val invalidMessageParameters = "[\n" +
- " {\n" +
- " \"commonEventHeader\": {\n" +
- " \"version\": \"sample-version\",\n" +
- " \"domain\": \"HVRANMEAS\",\n" +
- " \"sequence\": 1,\n" +
- " \"priority\": 1,\n" +
- " \"eventId\": \"sample-event-id\",\n" +
- " \"eventName\": \"sample-event-name\",\n" +
- " \"eventType\": \"sample-event-type\",\n" +
- " \"startEpochMicrosec\": 120034455,\n" +
- " \"lastEpochMicrosec\": 120034455,\n" +
- " \"nfNamingCode\": \"sample-nf-naming-code\",\n" +
- " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" +
- " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" +
- " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" +
- " \"sourceId\": \"sample-source-id\",\n" +
- " \"sourceName\": \"sample-source-name\"\n" +
- " },\n" +
- " \"messagesAmount\": 3\n" +
- " }\n" +
- "]"
+private const val invalidMessageParameters =
+"""
+ [
+ {
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": "HVMEAS",
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name",
+ "vesEventListenerVersion": "another-version"
+ },
+ "messagesAmount": 3
+ }
+ ]
+"""
fun validMessagesParametesJson() = Json
.createReader(validMessageParameters.reader())