aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt9
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt32
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt15
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt4
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt22
11 files changed, 41 insertions, 54 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index 8affa0b1..a4a4374c 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
internal object MessageValidator {
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index a7780109..1d43588f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -23,7 +23,7 @@ import arrow.core.Try
import arrow.core.Option
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventOuterClass.VesEvent
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index d8ea45d6..d08ad9e9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.retry.Jitter
@@ -116,7 +115,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
for (route in routing) {
val routeObj = route.asJsonObject()
defineRoute {
- fromDomain(forNumber(routeObj.getInt("fromDomain")))
+ fromDomain(routeObj.getString("fromDomain"))
toTopic(routeObj.getString("toTopic"))
withFixedPartitioning()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index b611e9aa..a0c22418 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index a00a02d2..18191952 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -24,7 +24,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index 03996fd5..f5bfcce1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.impl.MessageValidator
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index e9cd5f3f..a42b982f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -20,8 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import arrow.core.Option
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
data class Routing(val routes: List<Route>) {
@@ -29,7 +28,7 @@ data class Routing(val routes: List<Route>) {
Option.fromNullable(routes.find { it.applies(commonHeader) })
}
-data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
@@ -63,11 +62,11 @@ class RoutingBuilder {
class RouteBuilder {
- private lateinit var domain: Domain
+ private lateinit var domain: String
private lateinit var targetTopic: String
private lateinit var partitioning: (CommonEventHeader) -> Int
- fun fromDomain(domain: Domain) {
+ fun fromDomain(domain: String) {
this.domain = domain
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index 213f4544..443dfa2f 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -25,13 +25,14 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
+
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.getDefaultInstance
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.newBuilder
internal object MessageValidatorTest : Spek({
@@ -46,8 +47,7 @@ internal object MessageValidatorTest : Spek({
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
}
- Domain.values()
- .filter { (it != Domain.UNRECOGNIZED && it != Domain.DOMAIN_UNDEFINED) }
+ VesEventDomain.values()
.forEach { domain ->
it("should accept message with $domain domain") {
val header = commonHeader(domain)
@@ -65,26 +65,8 @@ internal object MessageValidatorTest : Spek({
}
}
-
- val domainTestCases = mapOf(
- Domain.DOMAIN_UNDEFINED to false,
- Domain.FAULT to true
- )
-
- domainTestCases.forEach { value, expectedResult ->
- on("ves hv message including header with domain $value") {
- val commonEventHeader = commonHeader(value)
- val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
-
- it("should resolve validation result") {
- assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
- .isEqualTo(expectedResult)
- }
- }
- }
-
val priorityTestCases = mapOf(
- Priority.PRIORITY_UNDEFINED to false,
+ Priority.PRIORITY_NOT_PROVIDED to false,
Priority.HIGH to true
)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 91fa7c19..c2fe1cc5 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -27,11 +27,14 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -42,13 +45,13 @@ object RouterTest : Spek({
val config = routing {
defineRoute {
- fromDomain(Domain.HVRANMEAS)
+ fromDomain(HVMEAS.name)
toTopic("ves_rtpm")
withFixedPartitioning(2)
}
defineRoute {
- fromDomain(Domain.SYSLOG)
+ fromDomain(SYSLOG.name)
toTopic("ves_trace")
withFixedPartitioning()
}
@@ -56,7 +59,7 @@ object RouterTest : Spek({
val cut = Router(config)
on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(Domain.HVRANMEAS), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HVMEAS), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -77,7 +80,7 @@ object RouterTest : Spek({
}
on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(Domain.SYSLOG), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -98,7 +101,7 @@ object RouterTest : Spek({
}
on("message with unknown route") {
- val message = VesMessage(commonHeader(Domain.HEARTBEAT), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should not have route available") {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index a7d3971e..8950a557 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -26,10 +26,10 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -41,7 +41,7 @@ internal object VesDecoderTest : Spek({
val cut = VesDecoder()
on("ves hv message bytes") {
- val commonHeader = commonHeader(Domain.HEARTBEAT)
+ val commonHeader = commonHeader(HEARTBEAT)
val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
it("should decode only header and pass it on along with raw message") {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 59224cca..f21a2ecf 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,9 +28,11 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -62,14 +64,14 @@ internal object ConsulConfigurationProviderTest : Spek({
StepVerifier.create(consulConfigProvider().take(1))
.consumeNextWith {
- assertEquals("kafka:9093", it.kafkaBootstrapServers)
+ assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
val route1 = it.routing.routes[0]
- assertEquals(Domain.FAULT, route1.domain)
+ assertEquals(FAULT.name, route1.domain)
assertEquals("test-topic-1", route1.targetTopic)
val route2 = it.routing.routes[1]
- assertEquals(Domain.HEARTBEAT, route2.domain)
+ assertEquals(HEARTBEAT.name, route2.domain)
assertEquals("test-topic-2", route2.targetTopic)
}.verifyComplete()
@@ -95,7 +97,7 @@ internal object ConsulConfigurationProviderTest : Spek({
.verifyErrorMessage("Test exception")
}
- it("should update the health state"){
+ it("should update the health state") {
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
.expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
@@ -128,21 +130,23 @@ private fun constructConsulConfigProvider(url: String,
}
+const val kafkaAddress = "message-router-kafka"
+
fun constructConsulResponse(): String {
val config = """{
- "dmaap.kafkaBootstrapServers": "kafka:9093",
+ "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
"collector.routing": [
{
- "fromDomain": 1,
+ "fromDomain": "FAULT",
"toTopic": "test-topic-1"
},
{
- "fromDomain": 2,
+ "fromDomain": "HEARTBEAT",
"toTopic": "test-topic-2"
}
]
-}"""
+ }"""
val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))