aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt5
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt7
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt22
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt4
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt23
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt9
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt12
9 files changed, 46 insertions, 42 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 3f69c088..1334738a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.MessageDropCause
@@ -31,8 +32,8 @@ interface Sink {
interface Metrics {
fun notifyBytesReceived(size: Int)
- fun notifyMessageReceived(size: Int)
- fun notifyMessageSent(topic: String)
+ fun notifyMessageReceived(msg: WireFrameMessage)
+ fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index c670e1d8..ee499e19 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Try
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventOuterClass.VesEvent
@@ -30,9 +31,9 @@ import org.onap.ves.VesEventOuterClass.VesEvent
*/
internal class VesDecoder {
- fun decode(bytes: ByteData): Try<VesMessage> =
+ fun decode(frame: WireFrameMessage): Try<VesMessage> =
Try {
- val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
- VesMessage(decodedHeader, bytes)
+ val decodedHeader = VesEvent.parseFrom(frame.payload.unsafeAsArray()).commonEventHeader
+ VesMessage(decodedHeader, frame)
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index b29432f0..51f894d3 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
@@ -68,7 +67,7 @@ internal class VesHvCollector(
private fun decodeWireFrame(flux: Flux<ByteBuf>): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(wireChunkDecoder::decode)
- .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+ .doOnNext(metrics::notifyMessageReceived)
private fun filterInvalidWireFrame(flux: Flux<WireFrameMessage>): Flux<WireFrameMessage> = flux
.filterFailedWithLog {
@@ -78,15 +77,14 @@ internal class VesHvCollector(
}
private fun decodeProtobufPayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
- .map(WireFrameMessage::payload)
- .flatMap(::decodePayload)
-
- private fun decodePayload(rawPayload: ByteData): Flux<VesMessage> = protobufDecoder
- .decode(rawPayload)
- .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
- .filterFailedWithLog(logger, clientContext::fullMdc,
- { "Ves event header decoded successfully" },
- { "Failed to decode ves event header, reason: ${it.message}" })
+ .flatMap { frame ->
+ protobufDecoder
+ .decode(frame)
+ .doOnFailure { metrics.notifyMessageDropped(INVALID_MESSAGE) }
+ .filterFailedWithLog(logger, clientContext::fullMdc,
+ { "Ves event header decoded successfully" },
+ { "Failed to decode ves event header, reason: ${it.message}" })
+ }
private fun filterInvalidProtobufMessages(flux: Flux<VesMessage>): Flux<VesMessage> = flux
.filterFailedWithLog {
@@ -98,7 +96,7 @@ internal class VesHvCollector(
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext { metrics.notifyMessageSent(it.topic) }
+ .doOnNext(metrics::notifyMessageSent)
private fun findRoute(msg: VesMessage) = router
.findDestination(msg)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index ec8593af..14966d9b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider {
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
- val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
+ val bytes = totalBytes.addAndGet(msg.message.wtpFrame.payloadSize.toLong())
val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
if (msgs % INFO_LOGGING_FREQ == 0L)
logger.info(ctx, logMessageSupplier)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
index 7a6ac7c8..c92518a5 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
@@ -28,10 +28,12 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
*/
class VesMessageSerializer : Serializer<VesMessage> {
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ // not needed
}
- override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+ override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.wtpFrame?.payload?.unsafeAsArray()
override fun close() {
+ // not needed
}
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index 1965d78c..d3640193 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -19,11 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.model
-import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.ves.VesEventOuterClass.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
+data class VesMessage(val header: CommonEventHeader, val wtpFrame: WireFrameMessage)
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index f784daa4..7d136ef1 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -29,7 +29,8 @@ import org.jetbrains.spek.api.dsl.*
import org.onap.dcae.collectors.veshv.domain.*
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -43,7 +44,7 @@ internal object MessageValidatorTest : Spek({
val commonHeader = commonHeader()
it("should accept message with fully initialized message header") {
- val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
+ val vesMessage = VesMessage(commonHeader, wireProtocolFrame(commonHeader))
with(cut) {
assertThat(validateProtobufMessage(vesMessage).isRight())
.describedAs("message validation result").isTrue()
@@ -53,7 +54,7 @@ internal object MessageValidatorTest : Spek({
VesEventDomain.values().forEach { domain ->
it("should accept message with $domain domain") {
val header = commonHeader(domain)
- val vesMessage = VesMessage(header, vesEventBytes(header))
+ val vesMessage = VesMessage(header, wireProtocolFrame(header))
with(cut) {
assertThat(validateProtobufMessage(vesMessage).isRight())
.describedAs("message validation result").isTrue()
@@ -63,7 +64,7 @@ internal object MessageValidatorTest : Spek({
}
on("ves hv message bytes") {
- val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
+ val vesMessage = VesMessage(getDefaultInstance(), emptyWireProtocolFrame())
it("should not accept message with default header") {
with(cut) {
@@ -100,7 +101,7 @@ internal object MessageValidatorTest : Spek({
).forEach { value, expectedResult ->
on("ves hv message including header with priority $value") {
val commonEventHeader = commonHeader(priority = value)
- val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
+ val vesMessage = VesMessage(commonEventHeader, wireProtocolFrame(commonEventHeader))
it("should resolve validation result") {
with(cut) {
@@ -121,7 +122,7 @@ internal object MessageValidatorTest : Spek({
.setEventId("Sample event Id")
.setSourceName("Sample Source")
.build()
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept not fully initialized message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -148,7 +149,7 @@ internal object MessageValidatorTest : Spek({
on("ves hv message including header.vesEventListenerVersion with non-string major part") {
val commonHeader = commonHeader(vesEventListenerVersion = "sample-version")
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept message header") {
@@ -169,7 +170,7 @@ internal object MessageValidatorTest : Spek({
on("ves hv message including header.vesEventListenerVersion with major part != 7") {
val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3")
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -190,7 +191,7 @@ internal object MessageValidatorTest : Spek({
on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") {
val commonHeader = commonHeader(vesEventListenerVersion = "7.test")
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
it("should not accept message header") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
@@ -237,7 +238,7 @@ internal object MessageValidatorTest : Spek({
with(cut) {
on("valid message as input") {
val commonHeader = commonHeader()
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
it("should be right") {
@@ -247,7 +248,7 @@ internal object MessageValidatorTest : Spek({
}
on("invalid message as input") {
val commonHeader = newBuilder().build()
- val rawMessageBytes = vesEventBytes(commonHeader)
+ val rawMessageBytes = wireProtocolFrame(commonHeader)
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
it("should be left") {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index e4190163..90b850c0 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -21,13 +21,11 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
import arrow.core.Some
-import io.netty.buffer.ByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
@@ -36,6 +34,7 @@ import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
/**
@@ -61,7 +60,7 @@ object RouterTest : Spek({
val cut = Router(config, ClientContext())
on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
val result = cut.findDestination(message)
it("should have route available") {
@@ -82,7 +81,7 @@ object RouterTest : Spek({
}
on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
val result = cut.findDestination(message)
it("should have route available") {
@@ -103,7 +102,7 @@ object RouterTest : Spek({
}
on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
+ val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
val result = cut.findDestination(message)
it("should not have route available") {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 605e7a6e..74f33a78 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -29,7 +29,8 @@ import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
+import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -42,16 +43,16 @@ internal object VesDecoderTest : Spek({
on("ves hv message bytes") {
val commonHeader = commonHeader(HEARTBEAT)
- val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
+ val wtpFrame = wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
it("should decode only header and pass it on along with raw message") {
val expectedMessage = VesMessage(
commonHeader,
- rawMessageBytes
+ wtpFrame
)
assertTrue {
- cut.decode(rawMessageBytes).exists {
+ cut.decode(wtpFrame).exists {
it == expectedMessage
}
}
@@ -60,9 +61,10 @@ internal object VesDecoderTest : Spek({
on("invalid ves hv message bytes") {
val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
+ val wtpFrame = emptyWireProtocolFrame().copy(payload = rawMessageBytes, payloadSize = rawMessageBytes.size())
it("should throw error") {
- assertFailedWithError(cut.decode(rawMessageBytes))
+ assertFailedWithError(cut.decode(wtpFrame))
}
}
}