diff options
36 files changed, 535 insertions, 475 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index 8affa0b1..a4a4374c 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader internal object MessageValidator { diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index a7780109..1d43588f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -23,7 +23,7 @@ import arrow.core.Try import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventOuterClass.VesEvent /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index d8ea45d6..d08ad9e9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter @@ -116,7 +115,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, for (route in routing) { val routeObj = route.asJsonObject() defineRoute { - fromDomain(forNumber(routeObj.getInt("fromDomain"))) + fromDomain(routeObj.getString("fromDomain")) toTopic(routeObj.getString("toTopic")) withFixedPartitioning() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index b611e9aa..a0c22418 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index a00a02d2..18191952 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -24,7 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderOptions diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 03996fd5..f5bfcce1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.impl.MessageValidator -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index e9cd5f3f..a42b982f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -20,8 +20,7 @@ package org.onap.dcae.collectors.veshv.model import arrow.core.Option -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { @@ -29,7 +28,7 @@ data class Routing(val routes: List<Route>) { Option.fromNullable(routes.find { it.applies(commonHeader) }) } -data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { +data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain @@ -63,11 +62,11 @@ class RoutingBuilder { class RouteBuilder { - private lateinit var domain: Domain + private lateinit var domain: String private lateinit var targetTopic: String private lateinit var partitioning: (CommonEventHeader) -> Int - fun fromDomain(domain: Domain) { + fun fromDomain(domain: String) { this.domain = domain } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index 213f4544..443dfa2f 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -25,13 +25,14 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder + +import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority +import org.onap.ves.VesEventOuterClass.CommonEventHeader.getDefaultInstance +import org.onap.ves.VesEventOuterClass.CommonEventHeader.newBuilder internal object MessageValidatorTest : Spek({ @@ -46,8 +47,7 @@ internal object MessageValidatorTest : Spek({ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() } - Domain.values() - .filter { (it != Domain.UNRECOGNIZED && it != Domain.DOMAIN_UNDEFINED) } + VesEventDomain.values() .forEach { domain -> it("should accept message with $domain domain") { val header = commonHeader(domain) @@ -65,26 +65,8 @@ internal object MessageValidatorTest : Spek({ } } - - val domainTestCases = mapOf( - Domain.DOMAIN_UNDEFINED to false, - Domain.FAULT to true - ) - - domainTestCases.forEach { value, expectedResult -> - on("ves hv message including header with domain $value") { - val commonEventHeader = commonHeader(value) - val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) - - it("should resolve validation result") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation results") - .isEqualTo(expectedResult) - } - } - } - val priorityTestCases = mapOf( - Priority.PRIORITY_UNDEFINED to false, + Priority.PRIORITY_NOT_PROVIDED to false, Priority.HIGH to true ) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 91fa7c19..c2fe1cc5 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -27,11 +27,14 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -42,13 +45,13 @@ object RouterTest : Spek({ val config = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) + fromDomain(HVMEAS.name) toTopic("ves_rtpm") withFixedPartitioning(2) } defineRoute { - fromDomain(Domain.SYSLOG) + fromDomain(SYSLOG.name) toTopic("ves_trace") withFixedPartitioning() } @@ -56,7 +59,7 @@ object RouterTest : Spek({ val cut = Router(config) on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(Domain.HVRANMEAS), ByteData.EMPTY) + val message = VesMessage(commonHeader(HVMEAS), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -77,7 +80,7 @@ object RouterTest : Spek({ } on("message with existing route (trace)") { - val message = VesMessage(commonHeader(Domain.SYSLOG), ByteData.EMPTY) + val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -98,7 +101,7 @@ object RouterTest : Spek({ } on("message with unknown route") { - val message = VesMessage(commonHeader(Domain.HEARTBEAT), ByteData.EMPTY) + val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY) val result = cut.findDestination(message) it("should not have route available") { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index a7d3971e..8950a557 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -26,10 +26,10 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -41,7 +41,7 @@ internal object VesDecoderTest : Spek({ val cut = VesDecoder() on("ves hv message bytes") { - val commonHeader = commonHeader(Domain.HEARTBEAT) + val commonHeader = commonHeader(HEARTBEAT) val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) it("should decode only header and pass it on along with raw message") { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 59224cca..f21a2ecf 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -28,9 +28,11 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -62,14 +64,14 @@ internal object ConsulConfigurationProviderTest : Spek({ StepVerifier.create(consulConfigProvider().take(1)) .consumeNextWith { - assertEquals("kafka:9093", it.kafkaBootstrapServers) + assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers) val route1 = it.routing.routes[0] - assertEquals(Domain.FAULT, route1.domain) + assertEquals(FAULT.name, route1.domain) assertEquals("test-topic-1", route1.targetTopic) val route2 = it.routing.routes[1] - assertEquals(Domain.HEARTBEAT, route2.domain) + assertEquals(HEARTBEAT.name, route2.domain) assertEquals("test-topic-2", route2.targetTopic) }.verifyComplete() @@ -95,7 +97,7 @@ internal object ConsulConfigurationProviderTest : Spek({ .verifyErrorMessage("Test exception") } - it("should update the health state"){ + it("should update the health state") { StepVerifier.create(healthStateProvider().take(iterationCount)) .expectNextCount(iterationCount - 1) .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) @@ -128,21 +130,23 @@ private fun constructConsulConfigProvider(url: String, } +const val kafkaAddress = "message-router-kafka" + fun constructConsulResponse(): String { val config = """{ - "dmaap.kafkaBootstrapServers": "kafka:9093", + "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", "collector.routing": [ { - "fromDomain": 1, + "fromDomain": "FAULT", "toTopic": "test-topic-1" }, { - "fromDomain": 2, + "fromDomain": "HEARTBEAT", "toTopic": "test-topic-2" } ] -}""" + }""" val encodedValue = String(Base64.getEncoder().encode(config.toByteArray())) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index ba29844a..8e6103c9 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -29,6 +29,7 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration @@ -36,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS import reactor.core.publisher.Flux import reactor.math.sum import java.security.MessageDigest @@ -62,7 +62,7 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong()) val params = MessageParameters( - commonEventHeader = commonHeader(HVRANMEAS), + commonEventHeader = commonHeader(HVMEAS), messageType = VALID, amount = numMessages ) @@ -92,7 +92,7 @@ object PerformanceSpecification : Spek({ val timeout = Duration.ofSeconds(30) val params = MessageParameters( - commonEventHeader = commonHeader(HVRANMEAS), + commonEventHeader = commonHeader(HVMEAS), messageType = VALID, amount = numMessages ) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index a9f3e9a4..60e10ee0 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,9 +24,13 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC -import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.HVMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration @@ -39,7 +43,7 @@ import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + import reactor.core.publisher.Flux import java.time.Duration @@ -54,8 +58,8 @@ object VesHvSpecification : Spek({ it("should handle multiple HV RAN events") { val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.HVRANMEAS), - vesWireFrameMessage(Domain.HVRANMEAS) + vesWireFrameMessage(HVMEAS), + vesWireFrameMessage(HVMEAS) ) assertThat(messages) @@ -65,8 +69,8 @@ object VesHvSpecification : Spek({ it("should not handle messages received from client after end-of-transmission message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) - val anotherValidMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) + val anotherValidMessage = vesWireFrameMessage(HVMEAS) val endOfTransmissionMessage = endOfTransmissionWireMessage() val handledEvents = sut.handleConnection(sink, @@ -91,23 +95,19 @@ object VesHvSpecification : Spek({ describe("Memory management") { it("should release memory for each handled and dropped message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) - val msgWithInvalidDomain = vesWireFrameMessage(Domain.OTHER) + val validMessage = vesWireFrameMessage(HVMEAS) val msgWithInvalidFrame = invalidWireFrame() - val msgWithTooBigPayload = vesMessageWithTooBigPayload(Domain.HVRANMEAS) + val msgWithTooBigPayload = vesMessageWithTooBigPayload(HVMEAS) val expectedRefCnt = 0 val handledEvents = sut.handleConnection( - sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame, msgWithTooBigPayload) + sink, validMessage, msgWithInvalidFrame, msgWithTooBigPayload) assertThat(handledEvents).hasSize(1) assertThat(validMessage.refCnt()) .describedAs("handled message should be released") .isEqualTo(expectedRefCnt) - assertThat(msgWithInvalidDomain.refCnt()) - .describedAs("message with invalid domain should be released") - .isEqualTo(expectedRefCnt) assertThat(msgWithInvalidFrame.refCnt()) .describedAs("message with invalid frame should be released") .isEqualTo(expectedRefCnt) @@ -118,7 +118,7 @@ object VesHvSpecification : Spek({ it("should release memory for end-of-transmission message") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) val endOfTransmissionMessage = endOfTransmissionWireMessage() val expectedRefCnt = 0 @@ -138,7 +138,7 @@ object VesHvSpecification : Spek({ it("should release memory for each message with invalid payload") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) val msgWithInvalidPayload = wireFrameMessageWithInvalidPayload() val expectedRefCnt = 0 @@ -157,7 +157,7 @@ object VesHvSpecification : Spek({ it("should release memory for each message with garbage frame") { val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(Domain.HVRANMEAS) + val validMessage = vesWireFrameMessage(HVMEAS) val msgWithGarbageFrame = garbageFrame() val expectedRefCnt = 0 @@ -179,11 +179,11 @@ object VesHvSpecification : Spek({ it("should direct message to a topic by means of routing configuration") { val (sut, sink) = vesHvWithStoringSink() - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC) assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) } @@ -193,17 +193,17 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(twoDomainsToOneTopicConfiguration) val messages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.HVRANMEAS), - vesWireFrameMessage(Domain.HEARTBEAT), - vesWireFrameMessage(Domain.MEASUREMENTS_FOR_VF_SCALING)) + vesWireFrameMessage(HVMEAS), + vesWireFrameMessage(HEARTBEAT), + vesWireFrameMessage(MEASUREMENT)) assertThat(messages).describedAs("number of routed messages").hasSize(3) assertThat(messages[0].topic).describedAs("first message topic") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(messages[1].topic).describedAs("second message topic") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(messages[2].topic).describedAs("last message topic") .isEqualTo(MEASUREMENTS_FOR_VF_SCALING_TOPIC) @@ -212,14 +212,14 @@ object VesHvSpecification : Spek({ it("should drop message if route was not found") { val (sut, sink) = vesHvWithStoringSink() val messages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.OTHER, "first"), - vesWireFrameMessage(Domain.HVRANMEAS, "second"), - vesWireFrameMessage(Domain.HEARTBEAT, "third")) + vesWireFrameMessage(OTHER, "first"), + vesWireFrameMessage(HVMEAS, "second"), + vesWireFrameMessage(HEARTBEAT, "third")) assertThat(messages).describedAs("number of routed messages").hasSize(1) val msg = messages[0] - assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVMEAS_TOPIC) assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") } } @@ -253,41 +253,41 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(configurationWithoutRouting) - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messages).isEmpty() sut.configurationProvider.updateConfiguration(basicConfiguration) - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messagesAfterUpdate).hasSize(1) val message = messagesAfterUpdate[0] assertThat(message.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(message.partition).describedAs("routed message partition") .isEqualTo(0) } it("should change domain routing") { - val messages = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messages = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messages).hasSize(1) val firstMessage = messages[0] assertThat(firstMessage.topic).describedAs("routed message topic on initial configuration") - .isEqualTo(HVRANMEAS_TOPIC) + .isEqualTo(HVMEAS_TOPIC) assertThat(firstMessage.partition).describedAs("routed message partition") .isEqualTo(0) sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) - val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + val messagesAfterUpdate = sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) assertThat(messagesAfterUpdate).hasSize(2) val secondMessage = messagesAfterUpdate[1] assertThat(secondMessage.topic).describedAs("routed message topic after configuration's change") - .isEqualTo(ALTERNATE_HVRANMEAS_TOPIC) + .isEqualTo(ALTERNATE_HVMEAS_TOPIC) assertThat(secondMessage.partition).describedAs("routed message partition") .isEqualTo(0) } @@ -302,13 +302,13 @@ object VesHvSpecification : Spek({ sut.configurationProvider.updateConfiguration(configurationWithDifferentRouting) } }.doOnNext { - sut.handleConnection(sink, vesWireFrameMessage(Domain.HVRANMEAS)) + sut.handleConnection(sink, vesWireFrameMessage(HVMEAS)) }.then().block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC } assertThat(messages.size).isEqualTo(messagesAmount) assertThat(messagesForEachTopic) @@ -329,14 +329,14 @@ object VesHvSpecification : Spek({ println("config changed") } } - .map { vesWireFrameMessage(Domain.HVRANMEAS) } + .map { vesWireFrameMessage(HVMEAS) } sut.collector.handleConnection(sut.alloc, incomingMessages).block(defaultTimeout) val messages = sink.sentMessages - val firstTopicMessagesCount = messages.count { it.topic == HVRANMEAS_TOPIC } - val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVRANMEAS_TOPIC } + val firstTopicMessagesCount = messages.count { it.topic == HVMEAS_TOPIC } + val secondTopicMessagesCount = messages.count { it.topic == ALTERNATE_HVMEAS_TOPIC } assertThat(messages.size).isEqualTo(messageStreamSize) assertThat(firstTopicMessagesCount) @@ -373,9 +373,9 @@ object VesHvSpecification : Spek({ val (sut, sink) = vesHvWithStoringSink() val handledMessages = sut.handleConnection(sink, - vesWireFrameMessage(Domain.HVRANMEAS, "first"), - vesMessageWithTooBigPayload(Domain.HVRANMEAS), - vesWireFrameMessage(Domain.HVRANMEAS)) + vesWireFrameMessage(HVMEAS, "first"), + vesMessageWithTooBigPayload(HVMEAS), + vesWireFrameMessage(HVMEAS)) assertThat(handledMessages).hasSize(1) assertThat(handledMessages.first().message.header.eventId).isEqualTo("first") diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt index ebeaa69e..688f2758 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -20,24 +20,27 @@ package org.onap.dcae.collectors.veshv.tests.fakes import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.routing -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor import reactor.retry.RetryExhaustedException -const val HVRANMEAS_TOPIC = "ves_hvRanMeas" +const val HVMEAS_TOPIC = "ves_hvRanMeas" const val MEASUREMENTS_FOR_VF_SCALING_TOPIC = "ves_hvMeasForVfScaling" -const val ALTERNATE_HVRANMEAS_TOPIC = "ves_alternateHvRanMeas" +const val ALTERNATE_HVMEAS_TOPIC = "ves_alternateHvRanMeas" val basicConfiguration: CollectorConfiguration = CollectorConfiguration( kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) - toTopic(HVRANMEAS_TOPIC) + fromDomain(HVMEAS.name) + toTopic(HVMEAS_TOPIC) withFixedPartitioning() } }.build() @@ -47,17 +50,17 @@ val twoDomainsToOneTopicConfiguration: CollectorConfiguration = CollectorConfigu kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) - toTopic(HVRANMEAS_TOPIC) + fromDomain(HVMEAS.name) + toTopic(HVMEAS_TOPIC) withFixedPartitioning() } defineRoute { - fromDomain(Domain.HEARTBEAT) - toTopic(HVRANMEAS_TOPIC) + fromDomain(HEARTBEAT.name) + toTopic(HVMEAS_TOPIC) withFixedPartitioning() } defineRoute { - fromDomain(Domain.MEASUREMENTS_FOR_VF_SCALING) + fromDomain(MEASUREMENT.name) toTopic(MEASUREMENTS_FOR_VF_SCALING_TOPIC) withFixedPartitioning() } @@ -69,8 +72,8 @@ val configurationWithDifferentRouting: CollectorConfiguration = CollectorConfigu kafkaBootstrapServers = "localhost:9969", routing = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) - toTopic(ALTERNATE_HVRANMEAS_TOPIC) + fromDomain(HVMEAS.name) + toTopic(ALTERNATE_HVMEAS_TOPIC) withFixedPartitioning() } }.build() diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 354edaeb..51f94cc4 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -30,7 +30,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventV5 +import org.onap.ves.VesEventOuterClass import java.io.InputStream import javax.json.Json @@ -68,22 +68,22 @@ class MessageStreamValidation( parameters.all { it.messageType == MessageType.FIXED_PAYLOAD } - private fun validateHeaders(actual: List<VesEventV5.VesEvent>, expected: List<VesEventV5.VesEvent>): Boolean { + private fun validateHeaders(actual: List<VesEventOuterClass.VesEvent>, expected: List<VesEventOuterClass.VesEvent>): Boolean { val consumedHeaders = actual.map { it.commonEventHeader } val generatedHeaders = expected.map { it.commonEventHeader } return generatedHeaders == consumedHeaders } - private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventV5.VesEvent>> = + private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> = messageGenerator.createMessageFlux(parameters) .map(PayloadWireFrameMessage::payload) .map(ByteData::unsafeAsArray) - .map(VesEventV5.VesEvent::parseFrom) + .map(VesEventOuterClass.VesEvent::parseFrom) .collectList() .asIo() private fun decodeConsumedEvents(consumedMessages: List<ByteArray>) = - consumedMessages.map(VesEventV5.VesEvent::parseFrom) + consumedMessages.map(VesEventOuterClass.VesEvent::parseFrom) } diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index c0ba5812..017360bb 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -36,8 +36,8 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.mockito.ArgumentMatchers.anySet import org.mockito.Mockito -import org.onap.ves.VesEventV5.VesEvent -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.VesEvent +import org.onap.ves.VesEventOuterClass.CommonEventHeader import java.util.concurrent.ConcurrentLinkedQueue /** @@ -179,6 +179,6 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P return VesEvent.newBuilder() .setCommonEventHeader(CommonEventHeader.newBuilder() .setEventId(eventId)) - .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray())) + .setHvMeasFields(ByteString.copyFrom(payload.toByteArray())) .build() } diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index 2932367b..beef26b6 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -20,17 +20,10 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.core.Either -import arrow.core.Left -import arrow.core.None import arrow.core.Right -import arrow.core.Some -import arrow.effects.IO -import javax.json.stream.JsonParsingException import com.google.protobuf.ByteString import com.nhaarman.mockito_kotlin.any import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.never -import com.nhaarman.mockito_kotlin.verify import com.nhaarman.mockito_kotlin.whenever import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.fail @@ -38,19 +31,15 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.mockito.ArgumentMatchers.anyList -import org.mockito.ArgumentMatchers.anySet import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventV5.VesEvent -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux -import java.util.concurrent.ConcurrentLinkedQueue -import javax.json.Json -import javax.json.JsonArray -import javax.json.JsonValue +import javax.json.stream.JsonParsingException /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -216,7 +205,7 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P return VesEvent.newBuilder() .setCommonEventHeader(CommonEventHeader.newBuilder() .setEventId(eventId)) - .setHvRanMeasFields(ByteString.copyFrom(payload.toByteArray())) + .setHvMeasFields(ByteString.copyFrom(payload.toByteArray())) .build() } diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml index eb2a7582..8f6ec874 100644 --- a/hv-collector-domain/pom.xml +++ b/hv-collector-domain/pom.xml @@ -74,7 +74,7 @@ <configuration> <protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact> <inputDirectories> - <include>${project.basedir}/src/main/proto</include> + <include>${project.basedir}/src/main/proto/event</include> </inputDirectories> <outputTargets> <outputTarget> diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt new file mode 100644 index 00000000..3e99cdc8 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesEventDomain.kt @@ -0,0 +1,35 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +enum class VesEventDomain { + FAULT, + HEARTBEAT, + MEASUREMENT, + MOBILE_FLOW, + OTHER, + PNFREGISTRATION, + SIP_SIGNALING, + STATE_CHANGE, + SYSLOG, + THRESHOLD_CROSSING_ALERT, + VOICE_QUALITY, + HVMEAS +} diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt index 91c75459..339a652c 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/validation.kt @@ -19,17 +19,18 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.ves.VesEventV5 +import org.onap.ves.VesEventOuterClass val headerRequiredFieldDescriptors = listOf( "version", - "eventName", "domain", - "eventId", - "sourceName", - "reportingEntityName", + "sequence", "priority", - "startEpochMicrosec", + "eventId", + "eventName", "lastEpochMicrosec", - "sequence") - .map { fieldName -> VesEventV5.VesEvent.CommonEventHeader.getDescriptor().findFieldByName(fieldName) }
\ No newline at end of file + "startEpochMicrosec", + "reportingEntityName", + "sourceName", + "vesEventListenerVersion") + .map { fieldName -> VesEventOuterClass.CommonEventHeader.getDescriptor().findFieldByName(fieldName) } diff --git a/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto b/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto deleted file mode 100644 index 5121f0eb..00000000 --- a/hv-collector-domain/src/main/proto/HVRanMeasFields-v5.proto +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -syntax = "proto3"; -package org.onap.ves; - -// Definition for RTPM - -message HVRanMeasFields { - message HVRanMeasPayload { - message PMObject { - message HVRanMeas { - uint32 measurement_id = 1; - repeated uint32 counter_subid = 2; - repeated sint64 counter_value = 3; - repeated uint32 missing_counter_subid = 4; - bool suspectFlagIncomplete = 5; // (some is data missing due to internal error) - bool suspectFlagOutOfSync = 6; // (source time not aligned) - } - - string uri = 1; // monitored object URI - repeated HVRanMeas hvRanMeas = 2; // performance counters grouped by measurement types - } - repeated PMObject pmObject = 1; - } - - message AdditionalField { - string name = 1; - string value = 2; - } - - string hvRanMeasFieldsVersion = 1; // version of HVRanMeasFields message - uint32 period_ms = 2; // period configured for reporting the data in milliseconds - string timezone = 3; // timezone of Network Function sending the data - string pmDictionaryVsn = 4; // vendor name + schema version E.g. NOKIA_LN7.0, uniquely identify the relevant PM dictionary - HVRanMeasPayload hvRanMeasPayload = 5; // objects being monitored - repeated AdditionalField additionalFields = 6; // array of name-value pairs if needed -}
\ No newline at end of file diff --git a/hv-collector-domain/src/main/proto/VesEvent-v5.proto b/hv-collector-domain/src/main/proto/VesEvent-v5.proto deleted file mode 100644 index 340133b2..00000000 --- a/hv-collector-domain/src/main/proto/VesEvent-v5.proto +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -syntax = "proto3"; -package org.onap.ves; - -message VesEvent { - - // VES CommonEventHeader adapted to GPB (Google Protocol Buffers) - // Source: https://git.opnfv.org/ves/tree/tests/docs/ves_data_model.json - // 2017-05-13 Align with VES 5.0 schema. - // blob: ca948ff67e8a2de4e2a47cffc4d4d2893170ab76 - - message CommonEventHeader { - string version = 1; // required, "version of the event header" - enum Domain { - DOMAIN_UNDEFINED = 0; - FAULT = 1; - HEARTBEAT = 2; - MEASUREMENTS_FOR_VF_SCALING = 3; - MOBILE_FLOW = 4; - SIP_SIGNALING = 5; - STATE_CHANGE = 6; - SYSLOG = 7; - THRESHOLD_CROSSING_ALERT = 8; - VOICE_QUALITY = 9; - OTHER = 10; - HVRANMEAS = 11; - } - Domain domain = 2; // required, "the eventing domain associated with the event" [map to string] - - uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" - - enum Priority { - PRIORITY_UNDEFINED = 0; - HIGH = 1; - MEDIUM = 2; - NORMAL = 3; - LOW = 4; - } - Priority priority = 4; // required, "processing priority" - - string eventId = 5; // required, "event key that is unique to the event source" - string eventName = 6; // required, "unique event name" - string eventType = 7; // "for example - applicationVnf, guestOS, hostOS, platform" - - uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - - string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" - string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" - - string reportingEntityId = 12; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" - bytes reportingEntityName = 13; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName" - bytes sourceId = 14; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" - string sourceName = 15; // required, "name of the entity experiencing the event issue" - - reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" - reserved 100; - } - - CommonEventHeader commonEventHeader = 1; - - oneof eventFields // required, payload, each high-volume domain has its specific GPB schema - { - bytes hvRanMeasFields = 2; // if domain==HVRANMEAS, GPB schema: HVRanMeasFields.proto - } -} - -message VesEventList { - repeated VesEvent vesEvent = 1; -} diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto new file mode 100644 index 00000000..54a6d149 --- /dev/null +++ b/hv-collector-domain/src/main/proto/event/VesEvent.proto @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +syntax = "proto3"; +package org.onap.ves; + +message VesEvent { + CommonEventHeader commonEventHeader = 1; // required + + oneof eventFields // required, payload + { + // each new high-volume domain can add an entry for its own GPB message + // the field can be opaque (bytes) to allow decoding the payload in a separate step + bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields + } +} + +// VES CommonEventHeader adapted to GPB (Google Protocol Buffers) +// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain + +message CommonEventHeader { + string version = 1; // required, "version of the gpb common event header" + string domain = 2; // required, "the eventing domain associated with the event", allowed values: + // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING, + // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS + + uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" + + enum Priority { + PRIORITY_NOT_PROVIDED = 0; + HIGH = 1; + MEDIUM = 2; + NORMAL = 3; + LOW = 4; + } + Priority priority = 4; // required, "processing priority" + + string eventId = 5; // required, "event key that is unique to the event source" + string eventName = 6; // required, "unique event name" + string eventType = 7; // "for example - guest05, platform" + + uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + + string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" + string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" + string nfVendorName = 12; // " Vendor Name providing the nf " + + bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" + string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" + bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" + string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" + string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" + string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener" + + reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" + reserved 100; +} diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto new file mode 100644 index 00000000..9a8582d5 --- /dev/null +++ b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto @@ -0,0 +1,43 @@ +/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package org.onap.ves;
+import "MeasDataCollection.proto"; // for 3GPP PM format
+
+message HVMeasFields
+{
+ string hvMeasFieldsVersion = 1;
+ measDataCollection.MeasDataCollection measDataCollection = 2;
+ // From 3GPP TS 28.550
+ // Informative: mapping between similar header fields (format may be different)
+ // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader
+ // senderName sourceName
+ // senderType nfNamingCode + nfcNamingCode
+ // vendorName nfVendorName
+ // collectionBeginTime startEpochMicrosec
+ // timestamp lastEpochMicrosec
+ repeated HashMap eventAddlFlds = 3; // optional per-event data
+}
+
+message HashMap
+{
+ string name = 1;
+ string value = 2;
+}
\ No newline at end of file diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto new file mode 100644 index 00000000..472dcc43 --- /dev/null +++ b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto @@ -0,0 +1,104 @@ +/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+syntax = "proto3";
+package measDataCollection;
+
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).
+// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.
+// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.
+// Differences/additions to 3GPP TS 28.550 are marked with "%%".
+
+message MeasDataCollection // top-level message
+{
+ MeasHeader measHeader = 1;
+ repeated MeasData measData = 2; // %%: use a single instance for RTPM
+ MeasFooter measFooter = 3;
+}
+
+message MeasHeader
+{
+ string streamFormatVersion = 1;
+ string senderName = 2;
+ string senderType = 3;
+ string vendorName = 4;
+ string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)
+}
+
+message MeasData
+{
+ string measuredEntityId = 1; // DN as per 3GPP TS 32.300
+ string measuredEntityUserName = 2; // network function User Name
+ string measuredEntitySoftwareVersion = 3;
+ uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event)
+ repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
+ repeated MeasInfo measInfo = 6;
+}
+
+
+message MeasInfo
+{
+ oneof MeasInfoId { // measurement group identifier
+ uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
+ string measInfoId = 2; // identifier as string (more generic)
+ }
+
+ oneof MeasTypes { // measurement identifiers associated with the measurement results
+ IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
+ SMeasTypes measTypes = 4; // identifiers as strings (more generic)
+ }
+ // Needed only because GPB does not support repeated fields directly inside 'oneof'
+ message IMeasTypes { repeated uint32 iMeasType = 1; }
+ message SMeasTypes { repeated string measType = 1; }
+
+ string jobIdList = 5;
+ repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups
+}
+
+message MeasValue
+{
+ oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432
+ string measObjInstId = 1; // LDN itself
+ uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
+ }
+ repeated MeasResult measResults = 3;
+ bool suspectFlag = 4;
+ repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data
+}
+
+message MeasResult
+{
+ uint32 p = 1; // Optional index in the MeasTypes array
+ oneof xValue {
+ sint64 iValue = 2;
+ double rValue = 3;
+ bool isNull = 4;
+ }
+}
+
+message MeasFooter
+{
+ string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"
+}
+
+message nameValue // %%: vendor-defined name-value pair
+{
+ string name = 1;
+ string value = 2;
+}
\ No newline at end of file diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index fa63c36e..b992d530 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.domain import arrow.core.Either import io.netty.buffer.ByteBuf import io.netty.buffer.Unpooled -import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.ObjectAssert import org.jetbrains.spek.api.Spek diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt index c6aa89b2..78042260 100644 --- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt @@ -25,7 +25,10 @@ import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER + import java.util.UUID.randomUUID @@ -39,7 +42,7 @@ private fun ByteBuf.writeValidWireFrameHeaders() { writeByte(0x01) // content type = GPB } -fun vesWireFrameMessage(domain: Domain = Domain.OTHER, +fun vesWireFrameMessage(domain: VesEventDomain = OTHER, id: String = randomUUID().toString()): ByteBuf = allocator.buffer().run { writeValidWireFrameHeaders() @@ -70,7 +73,7 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run { writeByte(0x01) // content type = GPB } -fun vesMessageWithTooBigPayload(domain: Domain = Domain.DOMAIN_UNDEFINED): ByteBuf = +fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf = allocator.buffer().run { writeValidWireFrameHeaders() diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index 6aeb6206..0341c2ff 100644 --- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -23,27 +23,31 @@ package org.onap.dcae.collectors.veshv.tests.utils import com.google.protobuf.ByteString import com.google.protobuf.MessageLite import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.ves.VesEventV5 +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.ves.VesEventOuterClass +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority import java.util.UUID.randomUUID -fun vesEvent(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS, +fun vesEvent(domain: VesEventDomain = HVMEAS, id: String = randomUUID().toString(), hvRanMeasFields: ByteString = ByteString.EMPTY -): VesEventV5.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields) +): VesEventOuterClass.VesEvent = vesEvent(commonHeader(domain, id), hvRanMeasFields) -fun vesEvent(commonEventHeader: VesEventV5.VesEvent.CommonEventHeader, - hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventV5.VesEvent = - VesEventV5.VesEvent.newBuilder() +fun vesEvent(commonEventHeader: CommonEventHeader, + hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent = + VesEventOuterClass.VesEvent.newBuilder() .setCommonEventHeader(commonEventHeader) - .setHvRanMeasFields(hvRanMeasFields) + .setHvMeasFields(hvRanMeasFields) .build() -fun commonHeader(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS, +fun commonHeader(domain: VesEventDomain = HVMEAS, id: String = randomUUID().toString(), - priority: VesEventV5.VesEvent.CommonEventHeader.Priority = VesEventV5.VesEvent.CommonEventHeader.Priority.NORMAL): VesEventV5.VesEvent.CommonEventHeader = - VesEventV5.VesEvent.CommonEventHeader.newBuilder() + priority: Priority = Priority.NORMAL): CommonEventHeader = + CommonEventHeader.newBuilder() .setVersion("sample-version") - .setDomain(domain) + .setDomain(domain.name) .setSequence(1) .setPriority(priority) .setEventId(id) @@ -53,13 +57,16 @@ fun commonHeader(domain: VesEventV5.VesEvent.CommonEventHeader.Domain = VesEvent .setLastEpochMicrosec(120034455) .setNfNamingCode("sample-nf-naming-code") .setNfcNamingCode("sample-nfc-naming-code") - .setReportingEntityId("sample-reporting-entity-id") - .setReportingEntityName(ByteString.copyFromUtf8("sample-reporting-entity-name")) + .setNfVendorName("vendor-name") + .setReportingEntityId(ByteString.copyFromUtf8("sample-reporting-entity-id")) + .setReportingEntityName("sample-reporting-entity-name") .setSourceId(ByteString.copyFromUtf8("sample-source-id")) .setSourceName("sample-source-name") + .setTimeZoneOffset("+1") + .setVesEventListenerVersion("another-version") .build() -fun vesEventBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData = +fun vesEventBytes(commonHeader: CommonEventHeader, byteString: ByteString = ByteString.EMPTY): ByteData = vesEvent(commonHeader, byteString).toByteData() fun MessageLite.toByteData(): ByteData = ByteData(toByteArray())
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt index 8d989cc5..047d863c 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageParameters.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt index 768685c1..909db5e4 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParser.kt @@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl import arrow.core.Option import com.google.protobuf.util.JsonFormat import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors -import org.onap.ves.VesEventV5.VesEvent.* +import org.onap.ves.VesEventOuterClass.CommonEventHeader import javax.json.JsonObject /** diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index fec2609e..5d1f56dc 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -31,10 +31,9 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVA import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload -import org.onap.ves.VesEventV5.VesEvent -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventOuterClass.VesEvent +import org.onap.ves.VesEventOuterClass.CommonEventHeader + import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.charset.Charset @@ -79,10 +78,6 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) } - private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: HVRanMeasPayload): ByteArray { - return vesEvent(commonEventHeader, hvRanMeasPayload.toByteString()) - } - private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray { return createVesEvent(commonEventHeader, hvRanMeasPayload).toByteArray() } @@ -90,7 +85,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = VesEvent.newBuilder() .setCommonEventHeader(commonEventHeader) - .setHvRanMeasFields(payload) + .setHvMeasFields(payload) .build() private fun oversizedPayload() = diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt index acdaf19f..ef7eefa6 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGenerator.kt @@ -20,10 +20,8 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl import com.google.protobuf.ByteString -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject -import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas import java.util.* +import kotlin.streams.asSequence internal class PayloadGenerator { @@ -32,34 +30,12 @@ internal class PayloadGenerator { fun generateRawPayload(size: Int): ByteString = ByteString.copyFrom(ByteArray(size)) - fun generatePayload(numOfCountPerMeas: Long = 2, numOfMeasPerObject: Int = 2): HVRanMeasPayload { - val pmObject = generatePmObject(numOfCountPerMeas, numOfMeasPerObject) - return HVRanMeasPayload.newBuilder() - .addPmObject(pmObject) - .build() - } + fun generatePayload(numOfCountMeasurements: Long = 2): ByteString = + ByteString.copyFrom( + randomGenerator.ints(numOfCountMeasurements, 0, 256) + .asSequence() + .toString() + .toByteArray() + ) - private fun generatePmObject(numOfCountPerMeas: Long, numOfMeasPerObject: Int): PMObject { - val hvRanMeasList = MutableList(numOfMeasPerObject) { generateHvRanMeas(numOfCountPerMeas) } - val finalUriName = URI_BASE_NAME + randomGenerator.nextInt(UPPER_URI_NUMBER_BOUND) - return HVRanMeasPayload.PMObject.newBuilder() - .setUri(finalUriName) - .addAllHvRanMeas(hvRanMeasList.asIterable()) - .build() - } - - private fun generateHvRanMeas(numOfCountPerMeas: Long): HVRanMeas { - return HVRanMeasPayload.PMObject.HVRanMeas.newBuilder() - .setMeasurementId(randomGenerator.nextInt()) - .addAllCounterSubid(Iterable { randomGenerator.ints(numOfCountPerMeas).iterator() }) - .addAllCounterValue(Iterable { randomGenerator.longs(numOfCountPerMeas).iterator() }) - .setSuspectFlagIncomplete(false) - .setSuspectFlagOutOfSync(false) - .build() - } - - companion object { - private const val URI_BASE_NAME = "sample/uri" - private const val UPPER_URI_NUMBER_BOUND = 10_000 - } } diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt index c16459ce..ce394cc1 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/CommonEventHeaderParserTest.kt @@ -27,8 +27,9 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.STATE_CHANGE import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.ves.VesEventV5 +import org.onap.ves.VesEventOuterClass.CommonEventHeader import java.io.ByteArrayInputStream import javax.json.Json import kotlin.test.fail @@ -40,7 +41,7 @@ class CommonEventHeaderParserTest : Spek({ given("valid header in JSON format") { val commonEventHeader = commonHeader( - domain = VesEventV5.VesEvent.CommonEventHeader.Domain.STATE_CHANGE, + domain = STATE_CHANGE, id = "sample-event-id") val json = JsonFormat.printer().print(commonEventHeader).byteInputStream() @@ -75,7 +76,7 @@ class CommonEventHeaderParserTest : Spek({ } }) -fun assertFailed(result: Option<VesEventV5.VesEvent.CommonEventHeader>) = +fun assertFailed(result: Option<CommonEventHeader>) = result.fold({}, { fail() }) fun jsonObject(json: ByteArrayInputStream) = Json.createReader(json).readObject()
\ No newline at end of file diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index f13a33bf..ea3d094a 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -30,15 +30,15 @@ import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.ves.VesEventV5.VesEvent -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.FAULT -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HEARTBEAT -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.HVRANMEAS +import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.ves.VesEventOuterClass.VesEvent import reactor.test.test /** @@ -54,7 +54,7 @@ object MessageGeneratorImplTest : Spek({ val limit = 1000L generator .createMessageFlux(listOf(MessageParameters( - commonHeader(HVRANMEAS), + commonHeader(HVMEAS), MessageType.VALID ))) .take(limit) @@ -67,7 +67,7 @@ object MessageGeneratorImplTest : Spek({ it("should create message flux of specified size") { generator .createMessageFlux(listOf(MessageParameters( - commonHeader(HVRANMEAS), + commonHeader(HVMEAS), MessageType.VALID, 5 ))) @@ -88,7 +88,7 @@ object MessageGeneratorImplTest : Spek({ .assertNext { assertThat(it.isValid()).isTrue() assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name) } .verifyComplete() } @@ -98,7 +98,7 @@ object MessageGeneratorImplTest : Spek({ generator .createMessageFlux(listOf(MessageParameters( - commonHeader(HVRANMEAS), + commonHeader(HVMEAS), MessageType.TOO_BIG_PAYLOAD, 1 ))) @@ -106,7 +106,7 @@ object MessageGeneratorImplTest : Spek({ .assertNext { assertThat(it.isValid()).isTrue() assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name) } .verifyComplete() } @@ -115,7 +115,7 @@ object MessageGeneratorImplTest : Spek({ it("should create flux of messages with invalid payload") { generator .createMessageFlux(listOf(MessageParameters( - commonHeader(HVRANMEAS), + commonHeader(HVMEAS), MessageType.INVALID_GPB_DATA, 1 ))) @@ -133,7 +133,7 @@ object MessageGeneratorImplTest : Spek({ it("should create flux of messages with invalid version") { generator .createMessageFlux(listOf(MessageParameters( - commonHeader(HVRANMEAS), + commonHeader(HVMEAS), MessageType.INVALID_WIRE_FRAME, 1 ))) @@ -141,7 +141,7 @@ object MessageGeneratorImplTest : Spek({ .assertNext { assertThat(it.isValid()).isFalse() assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name) assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR) } .verifyComplete() @@ -160,7 +160,7 @@ object MessageGeneratorImplTest : Spek({ assertThat(it.isValid()).isTrue() assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name) } .verifyComplete() } @@ -170,7 +170,7 @@ object MessageGeneratorImplTest : Spek({ it("should create concatenated flux of messages") { val singleFluxSize = 5L val messageParameters = listOf( - MessageParameters(commonHeader(HVRANMEAS), MessageType.VALID, singleFluxSize), + MessageParameters(commonHeader(HVMEAS), MessageType.VALID, singleFluxSize), MessageParameters(commonHeader(FAULT), MessageType.TOO_BIG_PAYLOAD, singleFluxSize), MessageParameters(commonHeader(HEARTBEAT), MessageType.VALID, singleFluxSize) ) @@ -178,17 +178,17 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVRANMEAS) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name) } .expectNextCount(singleFluxSize - 1) .assertNext { assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name) } .expectNextCount(singleFluxSize - 1) .assertNext { assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) - assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT) + assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name) } .expectNextCount(singleFluxSize - 1) .verifyComplete() @@ -197,10 +197,10 @@ object MessageGeneratorImplTest : Spek({ } }) -fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader { - return VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader -} +fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader = + VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader + + +fun extractHvRanMeasFields(bytes: ByteData): ByteString = + VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields -fun extractHvRanMeasFields(bytes: ByteData): ByteString { - return VesEvent.parseFrom(bytes.unsafeAsArray()).hvRanMeasFields -} diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt index 3695ca4d..2b41e290 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt @@ -26,50 +26,27 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator -private const val DEFAULT_MEASUREMENTS_NUMBER = 2 -private const val DEFAULT_COUNTERS_NUMBER = 2 - -private val uriRegex = """sample/uri(\d+)""".toRegex() - object PayloadGeneratorTest : Spek({ given("payload factory object") { val payloadGenerator = PayloadGenerator() - on("two generated payloads") { - val generatedPayload0 = payloadGenerator.generatePayload() - val generatedPayload1 = payloadGenerator.generatePayload() - it("URIs should have different names") { - val matchResult0 = uriRegex.find(generatedPayload0.getPmObject(0).uri)!!.value - val matchResult1 = uriRegex.find(generatedPayload1.getPmObject(0).uri)!!.value - assertThat(matchResult0 != matchResult1).isTrue() - } - } + on("raw payload generation") { + val size = 100 + val generatedPayload = payloadGenerator.generateRawPayload(size) - on("call with default parameters") { - val generatedPayload = payloadGenerator.generatePayload() - it("should contain default numbers of measurements") { - assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(DEFAULT_MEASUREMENTS_NUMBER) - } - it("should contain default numbers of counters in measurement") { - assertThat(generatedPayload.getPmObject(0).getHvRanMeas(0).counterSubidCount).isEqualTo(DEFAULT_COUNTERS_NUMBER) + it("should generate sequence of zeros") { + assertThat(generatedPayload.size()).isEqualTo(size) + assertThat(generatedPayload.toByteArray()).isEqualTo(ByteArray(size)) } } - on("call with specified parameters") { - val numOfCountPerMeas: Long = 5 - val numOfMeasPerObject = 10 - val generatedPayload = payloadGenerator.generatePayload(numOfCountPerMeas, numOfMeasPerObject) - it("should contain specified number of measurements") { - assertThat(generatedPayload.getPmObject(0).hvRanMeasCount).isEqualTo(numOfMeasPerObject) - } - it("measurement should contain specified number of counters") { - assertThat(generatedPayload.getPmObject(0).hvRanMeasList - .filter { numOfCountPerMeas.toInt() == it.counterSubidCount } - .size) - .isEqualTo(numOfMeasPerObject) + on("two generated payloads") { + val generatedPayload0 = payloadGenerator.generatePayload() + val generatedPayload1 = payloadGenerator.generatePayload() + it("should be different") { + assertThat(generatedPayload0 != generatedPayload1).isTrue() } - } } }) diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt index 88550603..45e936a6 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt @@ -21,73 +21,81 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl import javax.json.Json -private const val validMessageParameters = "[\n" + - " {\n" + - " \"commonEventHeader\": {\n" + - " \"version\": \"sample-version\",\n" + - " \"domain\": \"HVRANMEAS\",\n" + - " \"sequence\": 1,\n" + - " \"priority\": 1,\n" + - " \"eventId\": \"sample-event-id\",\n" + - " \"eventName\": \"sample-event-name\",\n" + - " \"eventType\": \"sample-event-type\",\n" + - " \"startEpochMicrosec\": 120034455,\n" + - " \"lastEpochMicrosec\": 120034455,\n" + - " \"nfNamingCode\": \"sample-nf-naming-code\",\n" + - " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" + - " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" + - " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" + - " \"sourceId\": \"sample-source-id\",\n" + - " \"sourceName\": \"sample-source-name\"\n" + - " },\n" + - " \"messageType\": \"VALID\",\n" + - " \"messagesAmount\": 25000\n" + - " },\n" + - " {\n" + - " \"commonEventHeader\": {\n" + - " \"version\": \"sample-version\",\n" + - " \"domain\": \"HVRANMEAS\",\n" + - " \"sequence\": 1,\n" + - " \"priority\": 1,\n" + - " \"eventId\": \"sample-event-id\",\n" + - " \"eventName\": \"sample-event-name\",\n" + - " \"eventType\": \"sample-event-type\",\n" + - " \"startEpochMicrosec\": 120034455,\n" + - " \"lastEpochMicrosec\": 120034455,\n" + - " \"nfNamingCode\": \"sample-nf-naming-code\",\n" + - " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" + - " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" + - " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" + - " \"sourceId\": \"sample-source-id\",\n" + - " \"sourceName\": \"sample-source-name\"\n" + - " },\n" + - " \"messageType\": \"TOO_BIG_PAYLOAD\",\n" + - " \"messagesAmount\": 100\n" + - " }\n" + - "]" +private const val validMessageParameters = +"""[ + { + "commonEventHeader": { + "version": "sample-version", + "domain": "HVMEAS", + "sequence": 1, + "priority": 1, + "eventId": "sample-event-id", + "eventName": "sample-event-name", + "eventType": "sample-event-type", + "startEpochMicrosec": 120034455, + "lastEpochMicrosec": 120034455, + "nfNamingCode": "sample-nf-naming-code", + "nfcNamingCode": "sample-nfc-naming-code", + "reportingEntityId": "sample-reporting-entity-id", + "reportingEntityName": "sample-reporting-entity-name", + "sourceId": "sample-source-id", + "sourceName": "sample-source-name", + "vesEventListenerVersion": "another-version" + }, + "messageType": "VALID", + "messagesAmount": 25000 + }, + { + "commonEventHeader": { + "version": "sample-version", + "domain": "HVMEAS", + "sequence": 1, + "priority": 1, + "eventId": "sample-event-id", + "eventName": "sample-event-name", + "eventType": "sample-event-type", + "startEpochMicrosec": 120034455, + "lastEpochMicrosec": 120034455, + "nfNamingCode": "sample-nf-naming-code", + "nfcNamingCode": "sample-nfc-naming-code", + "reportingEntityId": "sample-reporting-entity-id", + "reportingEntityName": "sample-reporting-entity-name", + "sourceId": "sample-source-id", + "sourceName": "sample-source-name", + "vesEventListenerVersion": "another-version" + }, + "messageType": "TOO_BIG_PAYLOAD", + "messagesAmount": 100 + } + ] +""" -private const val invalidMessageParameters = "[\n" + - " {\n" + - " \"commonEventHeader\": {\n" + - " \"version\": \"sample-version\",\n" + - " \"domain\": \"HVRANMEAS\",\n" + - " \"sequence\": 1,\n" + - " \"priority\": 1,\n" + - " \"eventId\": \"sample-event-id\",\n" + - " \"eventName\": \"sample-event-name\",\n" + - " \"eventType\": \"sample-event-type\",\n" + - " \"startEpochMicrosec\": 120034455,\n" + - " \"lastEpochMicrosec\": 120034455,\n" + - " \"nfNamingCode\": \"sample-nf-naming-code\",\n" + - " \"nfcNamingCode\": \"sample-nfc-naming-code\",\n" + - " \"reportingEntityId\": \"sample-reporting-entity-id\",\n" + - " \"reportingEntityName\": \"sample-reporting-entity-name\",\n" + - " \"sourceId\": \"sample-source-id\",\n" + - " \"sourceName\": \"sample-source-name\"\n" + - " },\n" + - " \"messagesAmount\": 3\n" + - " }\n" + - "]" +private const val invalidMessageParameters = +""" + [ + { + "commonEventHeader": { + "version": "sample-version", + "domain": "HVMEAS", + "sequence": 1, + "priority": 1, + "eventId": "sample-event-id", + "eventName": "sample-event-name", + "eventType": "sample-event-type", + "startEpochMicrosec": 120034455, + "lastEpochMicrosec": 120034455, + "nfNamingCode": "sample-nf-naming-code", + "nfcNamingCode": "sample-nfc-naming-code", + "reportingEntityId": "sample-reporting-entity-id", + "reportingEntityName": "sample-reporting-entity-name", + "sourceId": "sample-source-id", + "sourceName": "sample-source-name", + "vesEventListenerVersion": "another-version" + }, + "messagesAmount": 3 + } + ] +""" fun validMessagesParametesJson() = Json .createReader(validMessageParameters.reader()) |