diff options
Diffstat (limited to 'hv-collector-core')
11 files changed, 41 insertions, 54 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index 8affa0b1..a4a4374c 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader internal object MessageValidator { diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index a7780109..1d43588f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -23,7 +23,7 @@ import arrow.core.Try import arrow.core.Option import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventOuterClass.VesEvent /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index d8ea45d6..d08ad9e9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.retry.Jitter @@ -116,7 +115,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, for (route in routing) { val routeObj = route.asJsonObject() defineRoute { - fromDomain(forNumber(routeObj.getInt("fromDomain"))) + fromDomain(routeObj.getString("fromDomain")) toTopic(routeObj.getString("toTopic")) withFixedPartitioning() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index b611e9aa..a0c22418 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index a00a02d2..18191952 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -24,7 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderOptions diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 03996fd5..f5bfcce1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.impl.MessageValidator -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventOuterClass.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index e9cd5f3f..a42b982f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -20,8 +20,7 @@ package org.onap.dcae.collectors.veshv.model import arrow.core.Option -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventOuterClass.CommonEventHeader data class Routing(val routes: List<Route>) { @@ -29,7 +28,7 @@ data class Routing(val routes: List<Route>) { Option.fromNullable(routes.find { it.applies(commonHeader) }) } -data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { +data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain @@ -63,11 +62,11 @@ class RoutingBuilder { class RouteBuilder { - private lateinit var domain: Domain + private lateinit var domain: String private lateinit var targetTopic: String private lateinit var partitioning: (CommonEventHeader) -> Int - fun fromDomain(domain: Domain) { + fun fromDomain(domain: String) { this.domain = domain } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index 213f4544..443dfa2f 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -25,13 +25,14 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder + +import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority +import org.onap.ves.VesEventOuterClass.CommonEventHeader.getDefaultInstance +import org.onap.ves.VesEventOuterClass.CommonEventHeader.newBuilder internal object MessageValidatorTest : Spek({ @@ -46,8 +47,7 @@ internal object MessageValidatorTest : Spek({ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() } - Domain.values() - .filter { (it != Domain.UNRECOGNIZED && it != Domain.DOMAIN_UNDEFINED) } + VesEventDomain.values() .forEach { domain -> it("should accept message with $domain domain") { val header = commonHeader(domain) @@ -65,26 +65,8 @@ internal object MessageValidatorTest : Spek({ } } - - val domainTestCases = mapOf( - Domain.DOMAIN_UNDEFINED to false, - Domain.FAULT to true - ) - - domainTestCases.forEach { value, expectedResult -> - on("ves hv message including header with domain $value") { - val commonEventHeader = commonHeader(value) - val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) - - it("should resolve validation result") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation results") - .isEqualTo(expectedResult) - } - } - } - val priorityTestCases = mapOf( - Priority.PRIORITY_UNDEFINED to false, + Priority.PRIORITY_NOT_PROVIDED to false, Priority.HIGH to true ) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 91fa7c19..c2fe1cc5 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -27,11 +27,14 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -42,13 +45,13 @@ object RouterTest : Spek({ val config = routing { defineRoute { - fromDomain(Domain.HVRANMEAS) + fromDomain(HVMEAS.name) toTopic("ves_rtpm") withFixedPartitioning(2) } defineRoute { - fromDomain(Domain.SYSLOG) + fromDomain(SYSLOG.name) toTopic("ves_trace") withFixedPartitioning() } @@ -56,7 +59,7 @@ object RouterTest : Spek({ val cut = Router(config) on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(Domain.HVRANMEAS), ByteData.EMPTY) + val message = VesMessage(commonHeader(HVMEAS), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -77,7 +80,7 @@ object RouterTest : Spek({ } on("message with existing route (trace)") { - val message = VesMessage(commonHeader(Domain.SYSLOG), ByteData.EMPTY) + val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -98,7 +101,7 @@ object RouterTest : Spek({ } on("message with unknown route") { - val message = VesMessage(commonHeader(Domain.HEARTBEAT), ByteData.EMPTY) + val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY) val result = cut.findDestination(message) it("should not have route available") { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index a7d3971e..8950a557 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -26,10 +26,10 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -41,7 +41,7 @@ internal object VesDecoderTest : Spek({ val cut = VesDecoder() on("ves hv message bytes") { - val commonHeader = commonHeader(Domain.HEARTBEAT) + val commonHeader = commonHeader(HEARTBEAT) val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) it("should decode only header and pass it on along with raw message") { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 59224cca..f21a2ecf 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -28,9 +28,11 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + import reactor.core.publisher.Mono import reactor.retry.Retry import reactor.test.StepVerifier @@ -62,14 +64,14 @@ internal object ConsulConfigurationProviderTest : Spek({ StepVerifier.create(consulConfigProvider().take(1)) .consumeNextWith { - assertEquals("kafka:9093", it.kafkaBootstrapServers) + assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers) val route1 = it.routing.routes[0] - assertEquals(Domain.FAULT, route1.domain) + assertEquals(FAULT.name, route1.domain) assertEquals("test-topic-1", route1.targetTopic) val route2 = it.routing.routes[1] - assertEquals(Domain.HEARTBEAT, route2.domain) + assertEquals(HEARTBEAT.name, route2.domain) assertEquals("test-topic-2", route2.targetTopic) }.verifyComplete() @@ -95,7 +97,7 @@ internal object ConsulConfigurationProviderTest : Spek({ .verifyErrorMessage("Test exception") } - it("should update the health state"){ + it("should update the health state") { StepVerifier.create(healthStateProvider().take(iterationCount)) .expectNextCount(iterationCount - 1) .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) @@ -128,21 +130,23 @@ private fun constructConsulConfigProvider(url: String, } +const val kafkaAddress = "message-router-kafka" + fun constructConsulResponse(): String { val config = """{ - "dmaap.kafkaBootstrapServers": "kafka:9093", + "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", "collector.routing": [ { - "fromDomain": 1, + "fromDomain": "FAULT", "toTopic": "test-topic-1" }, { - "fromDomain": 2, + "fromDomain": "HEARTBEAT", "toTopic": "test-topic-2" } ] -}""" + }""" val encodedValue = String(Base64.getEncoder().encode(config.toByteArray())) |