summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPrzemyslaw Wasala <przemyslaw.wasala@nokia.com>2018-09-24 10:11:42 +0000
committerGerrit Code Review <gerrit@onap.org>2018-09-24 10:11:42 +0000
commit7b269674526a267f14895df8b825f3b59b30b98a (patch)
tree606cdabf7354c0d2a70aa2e0c63a1e36c0a76d1a
parent8f4a019fb04f0b3c408179ae1ae6daa9b742cdba (diff)
parente880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 (diff)
Merge "Remove end-of-transmission message from protocol"
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt27
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt20
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt119
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt45
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt4
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt2
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt16
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt54
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt10
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt46
-rw-r--r--hv-collector-domain/src/main/proto/event/VesEvent.proto63
-rw-r--r--hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto14
-rw-r--r--hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto66
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt93
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt11
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt2
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt4
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt22
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt22
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt1
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt11
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt4
22 files changed, 201 insertions, 455 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index f608a2b9..8970e03e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.core.publisher.SynchronousSink
-import java.util.function.BiConsumer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -53,7 +48,7 @@ internal class VesHvCollector(
wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.transform { decodeWireFrame(it, wireDecoder) }
- .filter(PayloadWireFrameMessage::isValid)
+ .filter(WireFrameMessage::isValid)
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
@@ -62,14 +57,13 @@ internal class VesHvCollector(
.then()
}
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux
+ private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
.doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
.concatMap(decoder::decode)
- .handle(completeStreamOnEOT)
.doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
- private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux
- .map(PayloadWireFrameMessage::payload)
+ private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ .map(WireFrameMessage::payload)
.map(protobufDecoder::decode)
.flatMap { omitWhenNone(it) }
@@ -95,18 +89,5 @@ internal class VesHvCollector(
companion object {
private val logger = Logger(VesHvCollector::class)
-
- private val completeStreamOnEOT by lazy {
- BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink ->
- when (frame) {
- is EndOfTransmissionMessage -> {
- logger.info("Completing stream because of receiving EOT message")
- sink.complete()
- }
- is PayloadWireFrameMessage -> sink.next(frame)
- else -> sink.error(UnknownWireFrameTypeException(frame))
- }
- }
- }
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 80f62d1a..0775c652 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -27,8 +27,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.SynchronousSink
@@ -76,15 +74,9 @@ internal class WireChunkDecoder(
}
private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- when (frame) {
- is PayloadWireFrameMessage -> IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
- is EndOfTransmissionMessage -> IO {
- logEndOfTransmissionWireMessage()
- next.next(frame)
- }
+ IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
}
}
@@ -92,14 +84,10 @@ internal class WireChunkDecoder(
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
- private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) {
+ private fun logDecodedWireMessage(wire: WireFrameMessage) {
logger.trace { "Wire payload size: ${wire.payloadSize} B" }
}
- private fun logEndOfTransmissionWireMessage() {
- logger.trace { "Received end-of-transmission message" }
- }
-
private fun logEndOfData() {
logger.trace { "End of data in current TCP buffer" }
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index a9364ed3..d214ffcf 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -27,13 +27,10 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import reactor.test.test
-import kotlin.test.fail
/**
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
@@ -46,7 +43,7 @@ internal object WireChunkDecoderTest : Spek({
val encoder = WireFrameEncoder(alloc)
- fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame))
+ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
@@ -101,23 +98,23 @@ internal object WireChunkDecoderTest : Spek({
}
given("valid input") {
- val input = PayloadWireFrameMessage(samplePayload)
+ val input = WireFrameMessage(samplePayload)
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
}
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
- .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
createInstance().decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -126,30 +123,14 @@ internal object WireChunkDecoderTest : Spek({
}
}
- given("end-of-transmission marker byte with garbage after it") {
- val input = Unpooled.buffer()
- .writeByte(0xAA)
- .writeBytes(Unpooled.wrappedBuffer(samplePayload))
-
- it("should yield decoded end-of-transmission frame and error") {
- createInstance().decode(input).test()
- .expectNextMatches { it is EndOfTransmissionMessage }
- .verifyError(WireFrameException::class.java)
- }
-
- it("should leave memory unreleased") {
- verifyMemoryNotReleased(input)
- }
- }
-
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
createInstance().decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyError(WireFrameException::class.java)
}
@@ -159,16 +140,16 @@ internal object WireChunkDecoderTest : Spek({
}
given("two inputs containing two separate messages") {
- val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
+ val input2 = encoder.encode(WireFrameMessage(anotherPayload))
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -177,57 +158,15 @@ internal object WireChunkDecoderTest : Spek({
}
}
- given("two payload messages followed by end-of-transmission marker byte") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
-
- val input = Unpooled.buffer()
- .writeBytes(frame1)
- .writeBytes(frame2)
- .writeByte(0xAA)
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
- .expectNextMatches { it is EndOfTransmissionMessage }
- .verifyComplete()
- }
- }
-
- given("two payload messages separated by end-of-transmission marker byte") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
-
- val input = Unpooled.buffer()
- .writeBytes(frame1)
- .writeByte(0xAA)
- .writeBytes(frame2)
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
- .expectNextMatches { it is EndOfTransmissionMessage }
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
- .verifyComplete()
- }
-
- it("should release memory") {
- verifyMemoryReleased(input)
- }
- }
-
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1)
.test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
.verifyError(WireFrameException::class.java)
@@ -244,8 +183,8 @@ internal object WireChunkDecoderTest : Spek({
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
@@ -255,10 +194,10 @@ internal object WireChunkDecoderTest : Spek({
it("should yield decoded input frames") {
val cut = createInstance()
cut.decode(input1).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -268,8 +207,8 @@ internal object WireChunkDecoderTest : Spek({
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload))
- val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload))
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
@@ -282,8 +221,8 @@ internal object WireChunkDecoderTest : Spek({
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size }
- .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size }
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
.verifyComplete()
}
@@ -292,16 +231,4 @@ internal object WireChunkDecoderTest : Spek({
}
}
}
-})
-
-
-private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage =
- if (msg is PayloadWireFrameMessage) {
- msg
- } else {
- fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}")
- }
-
-private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
- this as? EndOfTransmissionMessage
- ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") \ No newline at end of file
+}) \ No newline at end of file
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 60e10ee0..3eba9b6b 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -37,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting
import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
-import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
@@ -66,30 +65,6 @@ object VesHvSpecification : Spek({
.describedAs("should send all events")
.hasSize(2)
}
-
- it("should not handle messages received from client after end-of-transmission message") {
- val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(HVMEAS)
- val anotherValidMessage = vesWireFrameMessage(HVMEAS)
- val endOfTransmissionMessage = endOfTransmissionWireMessage()
-
- val handledEvents = sut.handleConnection(sink,
- validMessage,
- endOfTransmissionMessage,
- anotherValidMessage
- )
-
- assertThat(handledEvents).hasSize(1)
- assertThat(validMessage.refCnt())
- .describedAs("first message should be released")
- .isEqualTo(0)
- assertThat(endOfTransmissionMessage.refCnt())
- .describedAs("end-of-transmission message should be released")
- .isEqualTo(0)
- assertThat(anotherValidMessage.refCnt())
- .describedAs("second (not handled) message should not be released")
- .isEqualTo(1)
- }
}
describe("Memory management") {
@@ -116,26 +91,6 @@ object VesHvSpecification : Spek({
.isEqualTo(expectedRefCnt)
}
- it("should release memory for end-of-transmission message") {
- val (sut, sink) = vesHvWithStoringSink()
- val validMessage = vesWireFrameMessage(HVMEAS)
- val endOfTransmissionMessage = endOfTransmissionWireMessage()
- val expectedRefCnt = 0
-
- val handledEvents = sut.handleConnection(sink,
- validMessage,
- endOfTransmissionMessage
- )
-
- assertThat(handledEvents).hasSize(1)
- assertThat(validMessage.refCnt())
- .describedAs("handled message should be released")
- .isEqualTo(expectedRefCnt)
- assertThat(endOfTransmissionMessage.refCnt())
- .describedAs("end-of-transmission message should be released")
- .isEqualTo(expectedRefCnt)
- }
-
it("should release memory for each message with invalid payload") {
val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesWireFrameMessage(HVMEAS)
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 51f94cc4..38de5370 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -24,7 +24,7 @@ import arrow.effects.fix
import arrow.effects.monadError
import arrow.typeclasses.bindingCatch
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
@@ -77,7 +77,7 @@ class MessageStreamValidation(
private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> =
messageGenerator.createMessageFlux(parameters)
- .map(PayloadWireFrameMessage::payload)
+ .map(WireFrameMessage::payload)
.map(ByteData::unsafeAsArray)
.map(VesEventOuterClass.VesEvent::parseFrom)
.collectList()
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
index 017360bb..34ec8f5a 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -179,6 +179,6 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
return VesEvent.newBuilder()
.setCommonEventHeader(CommonEventHeader.newBuilder()
.setEventId(eventId))
- .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .setEventFields(ByteString.copyFrom(payload.toByteArray()))
.build()
}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index beef26b6..05fdd808 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -31,7 +31,7 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.mockito.ArgumentMatchers.anyList
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
@@ -89,7 +89,7 @@ internal class MessageStreamValidationTest : Spek({
// given
val jsonAsStream = sampleJsonAsStream()
val event = vesEvent()
- val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+ val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
val receivedMessageBytes = event.toByteArray()
givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1))
@@ -107,7 +107,7 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent(payload = "payload A")
val receivedEvent = vesEvent(payload = "payload B")
- val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
val receivedMessageBytes = receivedEvent.toByteArray()
givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
@@ -125,7 +125,7 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent()
val receivedEvent = vesEvent(eventId = "bbb")
- val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
val receivedMessageBytes = receivedEvent.toByteArray()
givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1))
@@ -144,7 +144,7 @@ internal class MessageStreamValidationTest : Spek({
// given
val jsonAsStream = sampleJsonAsStream()
val event = vesEvent()
- val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray())
+ val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray())
val receivedMessageBytes = event.toByteArray()
givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
@@ -162,7 +162,7 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent(payload = "payload A")
val receivedEvent = vesEvent(payload = "payload B")
- val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
val receivedMessageBytes = receivedEvent.toByteArray()
givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
@@ -180,7 +180,7 @@ internal class MessageStreamValidationTest : Spek({
val jsonAsStream = sampleJsonAsStream()
val generatedEvent = vesEvent()
val receivedEvent = vesEvent("bbb")
- val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray())
+ val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray())
val receivedMessageBytes = receivedEvent.toByteArray()
givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1))
@@ -205,7 +205,7 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P
return VesEvent.newBuilder()
.setCommonEventHeader(CommonEventHeader.newBuilder()
.setEventId(eventId))
- .setHvMeasFields(ByteString.copyFrom(payload.toByteArray()))
+ .setEventFields(ByteString.copyFrom(payload.toByteArray()))
.build()
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index c61ab266..4f867f13 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,8 +24,8 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R
*/
class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- fun encode(frame: PayloadWireFrameMessage): ByteBuf {
- val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size())
-
- bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt())
- bb.writeByte(frame.versionMajor.toInt())
- bb.writeByte(frame.versionMinor.toInt())
- bb.writeZero(RESERVED_BYTE_COUNT)
- bb.writeByte(frame.payloadTypeRaw.toInt())
- bb.writeInt(frame.payloadSize)
- frame.payload.writeTo(bb)
-
- return bb
- }
+ fun encode(frame: WireFrameMessage): ByteBuf = allocator
+ .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size())
+ .run {
+ writeByte(WireFrameMessage.MARKER_BYTE.toInt())
+ writeByte(frame.versionMajor.toInt())
+ writeByte(frame.versionMinor.toInt())
+ writeZero(RESERVED_BYTE_COUNT)
+ writeByte(frame.payloadType.toInt())
+ writeInt(frame.payloadSize)
+ }
+ .also {
+ frame.payload.writeTo(it)
+ }
}
/**
@@ -57,36 +57,20 @@ class WireFrameDecoder {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
isEmpty(byteBuf) -> Left(EmptyWireFrame)
- isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf)
headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes)
else -> parseWireFrame(byteBuf)
}
private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1
- private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1
-
- private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE
-
- private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> {
- byteBuf.markReaderIndex()
- val byte = byteBuf.readUnsignedByte()
-
- return if (byte == EndOfTransmissionMessage.MARKER_BYTE) {
- Right(EndOfTransmissionMessage)
- } else {
- byteBuf.resetReaderIndex()
- Left(MissingWireFrameHeaderBytes)
- }
- }
+ private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE
private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
byteBuf.markReaderIndex()
val mark = byteBuf.readUnsignedByte()
return when (mark) {
- EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage)
- PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
+ WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf)
else -> {
byteBuf.resetReaderIndex()
Left(InvalidWireFrameMarker(mark))
@@ -94,7 +78,7 @@ class WireFrameDecoder {
}
}
- private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> {
+ private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
val versionMajor = byteBuf.readUnsignedByte()
val versionMinor = byteBuf.readUnsignedByte()
byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
@@ -113,7 +97,7 @@ class WireFrameDecoder {
val payload = ByteData.readFrom(byteBuf, payloadSize)
- return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
+ return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize))
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index d82bb25f..dfadc5b8 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg)
class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
"Invalid start of frame. Expected 0x%02X, but was 0x%02X"
- .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker)
+ .format(WireFrameMessage.MARKER_BYTE, actualMarker)
)
object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
@@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg)
object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size")
object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size")
object EmptyWireFrame : MissingWireFrameBytes("empty wire frame")
-
-
-// Other
-
-class UnknownWireFrameTypeException(frame: WireFrameMessage)
- : Throwable("Unexpected wire frame message type: ${frame.javaClass}")
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index 642179e1..06ca9383 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -20,10 +20,8 @@
package org.onap.dcae.collectors.veshv.domain
-sealed class WireFrameMessage
-
/**
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian).
*
* ```
* -- Precedes every HV-VES message
@@ -31,21 +29,22 @@ sealed class WireFrameMessage
* magic INTEGER (0..255), – always 0xFF, identifies extended header usage
* versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v
* versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v
- * reserved BIT STRING (SIZE (16)), – reserved for future use
- * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf
- * messageLength INTEGER (0..4294967295) – message payload length
+ * reserved OCTET STRING (SIZE (3)), – reserved for future use
+ * payloadId INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf
+ * payloadLength INTEGER (0..4294967295) – message payload length
+ * payload OCTET STRING – length as per payloadLength
* }
* ```
*
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class PayloadWireFrameMessage(val payload: ByteData,
- val versionMajor: Short,
- val versionMinor: Short,
- val payloadTypeRaw: Short,
- val payloadSize: Int
-) : WireFrameMessage() {
+data class WireFrameMessage(val payload: ByteData,
+ val versionMajor: Short,
+ val versionMinor: Short,
+ val payloadType: Short,
+ val payloadSize: Int
+) {
constructor(payload: ByteArray) : this(
ByteData(payload),
SUPPORTED_VERSION_MAJOR,
@@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData,
fun isValid(): Boolean =
versionMajor == SUPPORTED_VERSION_MAJOR
- && PayloadContentType.isValidHexValue(payloadTypeRaw)
+ && PayloadContentType.isValidHexValue(payloadType)
&& payload.size() == payloadSize
companion object {
@@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData,
const val MAX_PAYLOAD_SIZE = 1024 * 1024
}
}
-
-
-/**
- * This message type should be used by client to indicate that he has finished sending data to collector.
- *
- * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
- *
- * ```
- * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination
- * Eot ::= SEQUENCE {
- * magic INTEGER (0..255), – always 0xAA
- * }
- * ```
- *
- * @since July 2018
- */
-
-object EndOfTransmissionMessage : WireFrameMessage() {
- const val MARKER_BYTE: Short = 0xAA
-}
-
diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto
index 54a6d149..0f9e5e1f 100644
--- a/hv-collector-domain/src/main/proto/event/VesEvent.proto
+++ b/hv-collector-domain/src/main/proto/event/VesEvent.proto
@@ -21,54 +21,55 @@ syntax = "proto3";
package org.onap.ves;
message VesEvent {
- CommonEventHeader commonEventHeader = 1; // required
+ CommonEventHeader commonEventHeader=1; // required
- oneof eventFields // required, payload
- {
- // each new high-volume domain can add an entry for its own GPB message
- // the field can be opaque (bytes) to allow decoding the payload in a separate step
- bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields
- }
+ bytes eventFields=2; // required, payload
+ // this field contains a domain-specific GPB message
+ // the field being opaque (bytes), the decoding of the payload occurs in a separate step
+ // the name of the GPB message for domain XYZ is XYZFields
+ // e.g. for domain==HVMEAS, the GPB message is HVMEASFields
}
// VES CommonEventHeader adapted to GPB (Google Protocol Buffers)
-// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain
+// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain.
-message CommonEventHeader {
- string version = 1; // required, "version of the gpb common event header"
- string domain = 2; // required, "the eventing domain associated with the event", allowed values:
- // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
- // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
+message CommonEventHeader
+{
+ string version = 1; // required, "version of the gpb common event header"
+ string domain = 2; // required, "the eventing domain associated with the event", allowed values:
+ // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING,
+ // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS
- uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
+ uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed"
- enum Priority {
+ enum Priority
+ {
PRIORITY_NOT_PROVIDED = 0;
HIGH = 1;
MEDIUM = 2;
NORMAL = 3;
LOW = 4;
}
- Priority priority = 4; // required, "processing priority"
+ Priority priority = 4; // required, "processing priority"
- string eventId = 5; // required, "event key that is unique to the event source"
- string eventName = 6; // required, "unique event name"
- string eventType = 7; // "for example - guest05, platform"
+ string eventId = 5; // required, "event key that is unique to the event source"
+ string eventName = 6; // required, "unique event name"
+ string eventType = 7; // "for example - guest05, platform"
- uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
- uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+ uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
+ uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds"
- string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
- string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
- string nfVendorName = 12; // " Vendor Name providing the nf "
+ string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards"
+ string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards"
+ string nfVendorName = 12; // " Vendor Name providing the nf "
- bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
- string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
- bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
- string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry"
- string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device"
- string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
+ bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process"
+ string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry"
+ bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process"
+ string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry"
+ string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device"
+ string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener"
- reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
+ reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources"
reserved 100;
}
diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
index 9a8582d5..94b40106 100644
--- a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
+++ b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto
@@ -24,20 +24,14 @@ import "MeasDataCollection.proto"; // for 3GPP PM format
message HVMeasFields
{
string hvMeasFieldsVersion = 1;
- measDataCollection.MeasDataCollection measDataCollection = 2;
- // From 3GPP TS 28.550
+ MeasDataCollection measDataCollection = 2;
+ // Based on 3GPP TS 28.550
// Informative: mapping between similar header fields (format may be different)
- // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader
+ // 3GPP MeasHeader ONAP/VES CommonEventHeader
// senderName sourceName
// senderType nfNamingCode + nfcNamingCode
// vendorName nfVendorName
// collectionBeginTime startEpochMicrosec
// timestamp lastEpochMicrosec
- repeated HashMap eventAddlFlds = 3; // optional per-event data
+ map<string, string> eventAddlFlds = 3; // optional per-event data (name/value HashMap)
}
-
-message HashMap
-{
- string name = 1;
- string value = 2;
-} \ No newline at end of file
diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
index 472dcc43..31f4dfb1 100644
--- a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
+++ b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto
@@ -18,87 +18,65 @@
* ============LICENSE_END=========================================================
*/
syntax = "proto3";
-package measDataCollection;
+package org.onap.ves;
-// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09).
// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.
-// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.
+// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version.
// Differences/additions to 3GPP TS 28.550 are marked with "%%".
-message MeasDataCollection // top-level message
+message MeasDataCollection // top-level message
{
- MeasHeader measHeader = 1;
- repeated MeasData measData = 2; // %%: use a single instance for RTPM
- MeasFooter measFooter = 3;
-}
-
-message MeasHeader
-{
- string streamFormatVersion = 1;
+ // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod).
+ string formatVersion = 1;
string senderName = 2;
string senderType = 3;
string vendorName = 4;
string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)
+ uint32 granularityPeriod = 6; // duration in seconds, %% moved from MeasInfo (single reporting period per event)
+ string measuredEntityUserName = 7; // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622
+ string measuredEntityDn = 8; // DN as per 3GPP TS 32.300
+ string measuredEntitySoftwareVersion = 9;
+ repeated string measObjInstIdList = 10; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
+ repeated MeasInfo measInfo = 11;
}
-message MeasData
-{
- string measuredEntityId = 1; // DN as per 3GPP TS 32.300
- string measuredEntityUserName = 2; // network function User Name
- string measuredEntitySoftwareVersion = 3;
- uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event)
- repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
- repeated MeasInfo measInfo = 6;
-}
-
-
message MeasInfo
{
oneof MeasInfoId { // measurement group identifier
- uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
- string measInfoId = 2; // identifier as string (more generic)
+ uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
+ string measInfoId = 2; // identifier as string (more generic)
}
oneof MeasTypes { // measurement identifiers associated with the measurement results
- IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
- SMeasTypes measTypes = 4; // identifiers as strings (more generic)
+ IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
+ SMeasTypes measTypes = 4; // identifiers as strings (more generic)
}
// Needed only because GPB does not support repeated fields directly inside 'oneof'
message IMeasTypes { repeated uint32 iMeasType = 1; }
message SMeasTypes { repeated string measType = 1; }
- string jobIdList = 5;
- repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups
+ string jobId = 5;
+ repeated MeasValue measValues = 6; // performance measurements grouped by measurement object
}
message MeasValue
{
oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432
- string measObjInstId = 1; // LDN itself
- uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
+ string measObjInstId = 1; // LDN itself
+ uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
}
repeated MeasResult measResults = 3;
bool suspectFlag = 4;
- repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data
+ map<string, string> measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap)
}
message MeasResult
{
- uint32 p = 1; // Optional index in the MeasTypes array
+ uint32 p = 1; // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes
oneof xValue {
sint64 iValue = 2;
double rValue = 3;
bool isNull = 4;
}
}
-
-message MeasFooter
-{
- string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"
-}
-
-message nameValue // %%: vendor-defined name-value pair
-{
- string name = 1;
- string value = 2;
-} \ No newline at end of file
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index b992d530..988789d2 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -28,7 +28,7 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -42,8 +42,7 @@ object WireFrameCodecsTest : Spek({
val encoder = WireFrameEncoder()
val decoder = WireFrameDecoder()
- fun createSampleFrame() =
- PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
+ fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
@@ -53,11 +52,11 @@ object WireFrameCodecsTest : Spek({
describe("Wire Frame invariants") {
given("input with unsupported major version") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData.EMPTY,
versionMajor = 100,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 0)
it("should fail validation") {
@@ -66,11 +65,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported minor version") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData.EMPTY,
versionMajor = 1,
versionMinor = 6,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 0)
it("should pass validation") {
@@ -79,11 +78,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with unsupported payload type") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData.EMPTY,
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = 0x69,
+ payloadType = 0x69,
payloadSize = 0)
it("should fail validation") {
@@ -92,11 +91,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with too small payload size") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 1)
it("should fail validation") {
@@ -105,11 +104,11 @@ object WireFrameCodecsTest : Spek({
}
given("input with too big payload size") {
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(byteArrayOf(1, 2, 3)),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = 8)
it("should fail validation") {
@@ -119,11 +118,11 @@ object WireFrameCodecsTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
it("should pass validation") {
@@ -139,14 +138,18 @@ object WireFrameCodecsTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail()
+ val decoded = decoder.decodeFirst(encoded).getMessageOrFail()
- it("should decode version") {
+ it("should decode major version") {
assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor)
}
+ it("should decode minor version") {
+ assertThat(decoded.versionMinor).isEqualTo(frame.versionMinor)
+ }
+
it("should decode payload type") {
- assertThat(decoded.payloadTypeRaw).isEqualTo(frame.payloadTypeRaw)
+ assertThat(decoded.payloadType).isEqualTo(frame.payloadType)
}
it("should decode payload size") {
@@ -170,14 +173,7 @@ object WireFrameCodecsTest : Spek({
assertBufferIntact(buff)
}
- it("should return end-of-transmission message when given end-of-transmission marker byte") {
- val buff = Unpooled.buffer()
- .writeByte(0xAA)
-
- assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
- }
-
- it("should return error when given any single byte other than end-of-transmission marker byte") {
+ it("should return error when given any single byte other than marker byte") {
val buff = Unpooled.buffer()
.writeByte(0xEE)
@@ -194,7 +190,7 @@ object WireFrameCodecsTest : Spek({
assertBufferIntact(buff)
}
- it("should return error when length looks ok but first byte is not 0xFF or 0xAA") {
+ it("should return error when length looks ok but first byte is not 0xFF") {
val buff = Unpooled.buffer()
.writeByte(0x69)
.writeBytes("some garbage".toByteArray())
@@ -203,14 +199,6 @@ object WireFrameCodecsTest : Spek({
assertBufferIntact(buff)
}
- it("should return end-of-transmission message when length looks ok and first byte is 0xAA") {
- val buff = Unpooled.buffer()
- .writeByte(0xAA)
- .writeBytes("some garbage".toByteArray())
-
- assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff))
- }
-
it("should return error when payload doesn't fit") {
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
@@ -223,8 +211,8 @@ object WireFrameCodecsTest : Spek({
it("should decode payload message leaving rest unread") {
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
- .writeByte(0xAA)
- val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail()
+ .writeByte(0xAB)
+ val decoded = decoder.decodeFirst(buff).getMessageOrFail()
assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
assertThat(buff.readableBytes()).isEqualTo(1)
@@ -236,11 +224,11 @@ object WireFrameCodecsTest : Spek({
it("should decode successfully when payload size is equal 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
@@ -250,11 +238,11 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
val buff = encoder.encode(input)
@@ -266,11 +254,11 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
val payload = ByteArray(MAX_PAYLOAD_SIZE)
- val input = PayloadWireFrameMessage(
+ val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
versionMinor = 0,
- payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
payloadSize = payload.size)
@@ -289,21 +277,6 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>)
fold({ assertj(assertThat(it)) }, { fail("Error expected") })
}
-private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage =
- fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() })
-
-private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage =
- this as? PayloadWireFrameMessage
- ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}")
-
-
-private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) {
- decoded.getEndOfTransmissionMessageOrFail()
-}
-
-private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
- fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() })
+private fun Either<WireFrameDecodingError, WireFrameMessage>.getMessageOrFail(): WireFrameMessage =
+ fold({ fail(it.message) }, { it })
-private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage =
- this as? EndOfTransmissionMessage
- ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index 78042260..85bdcab7 100644
--- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
@@ -23,8 +23,8 @@ import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
@@ -52,9 +52,6 @@ fun vesWireFrameMessage(domain: VesEventDomain = OTHER,
writeBytes(gpb) // ves event as GPB bytes
}
-fun endOfTransmissionWireMessage(): ByteBuf =
- allocator.buffer().writeByte(0xAA)
-
fun wireFrameMessageWithInvalidPayload(): ByteBuf = allocator.buffer().run {
writeValidWireFrameHeaders()
@@ -69,8 +66,8 @@ fun garbageFrame(): ByteBuf = allocator.buffer().run {
fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0xFF)
- writeByte(0x01) // version
- writeByte(0x01) // content type = GPB
+ writeByte(0x01) // version major
+ writeByte(0x01) // version minor
}
fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf =
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index 0341c2ff..57b960af 100644
--- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
@@ -39,7 +39,7 @@ fun vesEvent(commonEventHeader: CommonEventHeader,
hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent =
VesEventOuterClass.VesEvent.newBuilder()
.setCommonEventHeader(commonEventHeader)
- .setHvMeasFields(hvRanMeasFields)
+ .setEventFields(hvRanMeasFields)
.build()
fun commonHeader(domain: VesEventDomain = HVMEAS,
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
index d9329cb0..ace7f1cb 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.ves.message.generator.api
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
import reactor.core.publisher.Flux
@@ -29,7 +29,7 @@ import reactor.core.publisher.Flux
* @since June 2018
*/
interface MessageGenerator {
- fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage>
+ fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
companion object {
val INSTANCE: MessageGenerator by lazy {
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index 5d1f56dc..90e7770b 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.domain.PayloadContentType
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
@@ -44,11 +44,11 @@ import java.nio.charset.Charset
*/
class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
- override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> = Flux
+ override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
.fromIterable(messageParameters)
.flatMap { createMessageFlux(it) }
- private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> =
+ private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> =
Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) }
.let {
if (parameters.amount < 0)
@@ -57,17 +57,17 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
it.repeat(parameters.amount)
}
- private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): PayloadWireFrameMessage =
+ private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage =
when (messageType) {
VALID ->
- PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
+ WireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
TOO_BIG_PAYLOAD ->
- PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
+ WireFrameMessage(vesEvent(commonEventHeader, oversizedPayload()))
FIXED_PAYLOAD ->
- PayloadWireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
+ WireFrameMessage(vesEvent(commonEventHeader, fixedPayload()))
INVALID_WIRE_FRAME -> {
val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload()))
- PayloadWireFrameMessage(
+ WireFrameMessage(
payload,
UNSUPPORTED_VERSION,
UNSUPPORTED_VERSION,
@@ -75,7 +75,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
payload.size())
}
INVALID_GPB_DATA ->
- PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
+ WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset()))
}
private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray {
@@ -85,11 +85,11 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent =
VesEvent.newBuilder()
.setCommonEventHeader(commonEventHeader)
- .setHvMeasFields(payload)
+ .setEventFields(payload)
.build()
private fun oversizedPayload() =
- payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+ payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1)
private fun fixedPayload() =
payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
index ea3d094a..e380f931 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
@@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
@@ -87,7 +87,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.verifyComplete()
@@ -105,7 +105,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
}
.verifyComplete()
@@ -122,7 +122,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
.isThrownBy { extractCommonEventHeader(it.payload) }
}
@@ -140,9 +140,9 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isFalse()
- assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
- assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR)
+ assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
}
.verifyComplete()
}
@@ -158,7 +158,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
@@ -177,17 +177,17 @@ object MessageGeneratorImplTest : Spek({
generator.createMessageFlux(messageParameters)
.test()
.assertNext {
- assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name)
}
.expectNextCount(singleFluxSize - 1)
@@ -202,5 +202,5 @@ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader =
fun extractHvRanMeasFields(bytes: ByteData): ByteString =
- VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields
+ VesEvent.parseFrom(bytes.unsafeAsArray()).eventFields
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 558bd1c1..3fde2c7e 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -23,7 +23,6 @@ import arrow.core.Either
import arrow.core.Some
import arrow.core.Try
import arrow.core.fix
-import arrow.core.flatMap
import arrow.core.monad
import arrow.effects.IO
import arrow.typeclasses.binding
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index d1a5296a..af71e9ce 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
@@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
}
.build()
- fun sendIo(messages: Flux<PayloadWireFrameMessage>) =
+ fun sendIo(messages: Flux<WireFrameMessage>) =
sendRx(messages).then(Mono.just(Unit)).asIo()
- private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
+ private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
.newHandler { _, output -> handler(complete, messages, output) }
@@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
}
private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<PayloadWireFrameMessage>,
+ messages: Flux<WireFrameMessage>,
nettyOutbound: NettyOutbound): Publisher<Void> {
val allocator = nettyOutbound.alloc()
@@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
.logConnectionClosed()
.options { it.flushOnBoundary() }
.sendGroups(frames)
- .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
.then {
logger.info("Messages have been sent")
complete.onComplete()
@@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
companion object {
private val logger = Logger(VesHvClient::class)
private const val MAX_BATCH_SIZE = 128
- private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
}
}
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 80f39579..97535887 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -30,7 +30,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
@@ -98,7 +98,7 @@ internal class XnfSimulatorTest : Spek({
// given
val json = "[true]".byteInputStream()
val messageParams = listOf<MessageParameters>()
- val generatedMessages = Flux.empty<PayloadWireFrameMessage>()
+ val generatedMessages = Flux.empty<WireFrameMessage>()
val sendingIo = IO {}
whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)