summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt11
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt13
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt39
3 files changed, 44 insertions, 19 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index 591a48b7..a7780109 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Try
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventV5.VesEvent
@@ -29,8 +31,9 @@ import org.onap.ves.VesEventV5.VesEvent
*/
internal class VesDecoder {
- fun decode(bytes: ByteData): VesMessage {
- val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
- return VesMessage(decodedHeader, bytes)
- }
+ fun decode(bytes: ByteData): Option<VesMessage> =
+ Try {
+ val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+ VesMessage(decodedHeader, bytes)
+ }.toOption()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 2a07b9b8..52689162 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.SynchronousSink
@@ -71,19 +72,19 @@ internal class VesHvCollector(
private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
.map(PayloadWireFrameMessage::payload)
.map(protobufDecoder::decode)
-
+ .flatMap { omitWhenNone(it) }
private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
.flatMap(this::findRoute)
.compose(sink::send)
.doOnNext { metrics.notifyMessageSent(it.topic) }
- private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
- private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
- mapper(input).fold(
- { Mono.empty() },
- { Mono.just(it) })
+ private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
+
+ private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
+ { Mono.empty() },
+ { Mono.just(it) })
private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 90b34b1c..3f1f610e 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -19,10 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Option
import com.google.protobuf.ByteString
-import com.google.protobuf.InvalidProtocolBufferException
-import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
@@ -33,6 +31,8 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import java.nio.charset.Charset
+import kotlin.test.assertTrue
+import kotlin.test.fail
internal object VesDecoderTest : Spek({
@@ -41,22 +41,24 @@ internal object VesDecoderTest : Spek({
val cut = VesDecoder()
on("ves hv message bytes") {
- val commonHeader = CommonEventHeader.getDefaultInstance()
+ val commonHeader = commonEventHeader()
val msg = VesEvent.newBuilder()
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
.build()
val rawMessageBytes = msg.toByteData()
-
it("should decode only header and pass it on along with raw message") {
val expectedMessage = VesMessage(
commonHeader,
rawMessageBytes
)
- assertThat(cut.decode(rawMessageBytes)).isEqualTo(expectedMessage)
-
+ assertTrue {
+ cut.decode(rawMessageBytes).exists {
+ it == expectedMessage
+ }
+ }
}
}
@@ -64,9 +66,28 @@ internal object VesDecoderTest : Spek({
val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
it("should throw error") {
- assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
- .isThrownBy { cut.decode(rawMessageBytes) }
+ assertFailedWithError(cut.decode(rawMessageBytes))
}
}
}
})
+
+private fun <A> assertFailedWithError(option: Option<A>) =
+ option.exists {
+ fail("Error expected")
+ }
+
+
+private fun commonEventHeader() =
+ CommonEventHeader.getDefaultInstance().toBuilder()
+ .setDomain(CommonEventHeader.Domain.HEARTBEAT)
+ .setVersion("1.0")
+ .setEventName("xyz")
+ .setEventId("eventID")
+ .setEventName("Sample event name")
+ .setSourceName("Sample Source")
+ .setPriority(CommonEventHeader.Priority.MEDIUM)
+ .setStartEpochMicrosec(120034455)
+ .setLastEpochMicrosec(120034459)
+ .setSequence(1)
+ .build()