diff options
22 files changed, 201 insertions, 455 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index f608a2b9..8970e03e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -25,9 +25,6 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.UnknownWireFrameTypeException import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage @@ -35,8 +32,6 @@ import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.core.publisher.SynchronousSink -import java.util.function.BiConsumer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -53,7 +48,7 @@ internal class VesHvCollector( wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .transform { decodeWireFrame(it, wireDecoder) } - .filter(PayloadWireFrameMessage::isValid) + .filter(WireFrameMessage::isValid) .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) @@ -62,14 +57,13 @@ internal class VesHvCollector( .then() } - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<PayloadWireFrameMessage> = flux + private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } .concatMap(decoder::decode) - .handle(completeStreamOnEOT) .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - private fun decodePayload(flux: Flux<PayloadWireFrameMessage>): Flux<VesMessage> = flux - .map(PayloadWireFrameMessage::payload) + private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux + .map(WireFrameMessage::payload) .map(protobufDecoder::decode) .flatMap { omitWhenNone(it) } @@ -95,18 +89,5 @@ internal class VesHvCollector( companion object { private val logger = Logger(VesHvCollector::class) - - private val completeStreamOnEOT by lazy { - BiConsumer<WireFrameMessage, SynchronousSink<PayloadWireFrameMessage>> { frame, sink -> - when (frame) { - is EndOfTransmissionMessage -> { - logger.info("Completing stream because of receiving EOT message") - sink.complete() - } - is PayloadWireFrameMessage -> sink.next(frame) - else -> sink.error(UnknownWireFrameTypeException(frame)) - } - } - } } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index 80f62d1a..0775c652 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -27,8 +27,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.SynchronousSink @@ -76,15 +74,9 @@ internal class WireChunkDecoder( } private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - when (frame) { - is PayloadWireFrameMessage -> IO { - logDecodedWireMessage(frame) - next.next(frame) - } - is EndOfTransmissionMessage -> IO { - logEndOfTransmissionWireMessage() - next.next(frame) - } + IO { + logDecodedWireMessage(frame) + next.next(frame) } } @@ -92,14 +84,10 @@ internal class WireChunkDecoder( logger.trace { "Got message with total size of ${wire.readableBytes()} B" } } - private fun logDecodedWireMessage(wire: PayloadWireFrameMessage) { + private fun logDecodedWireMessage(wire: WireFrameMessage) { logger.trace { "Wire payload size: ${wire.payloadSize} B" } } - private fun logEndOfTransmissionWireMessage() { - logger.trace { "Received end-of-transmission message" } - } - private fun logEndOfData() { logger.trace { "End of data in current TCP buffer" } } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index a9364ed3..d214ffcf 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -27,13 +27,10 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import reactor.test.test -import kotlin.test.fail /** * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> @@ -46,7 +43,7 @@ internal object WireChunkDecoderTest : Spek({ val encoder = WireFrameEncoder(alloc) - fun WireChunkDecoder.decode(frame: PayloadWireFrameMessage) = decode(encoder.encode(frame)) + fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) @@ -101,23 +98,23 @@ internal object WireChunkDecoderTest : Spek({ } given("valid input") { - val input = PayloadWireFrameMessage(samplePayload) + val input = WireFrameMessage(samplePayload) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() } } given("valid input with part of next frame") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) - .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload)).slice(0, 3)) + .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) + .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3)) it("should yield decoded input frame") { createInstance().decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() } @@ -126,30 +123,14 @@ internal object WireChunkDecoderTest : Spek({ } } - given("end-of-transmission marker byte with garbage after it") { - val input = Unpooled.buffer() - .writeByte(0xAA) - .writeBytes(Unpooled.wrappedBuffer(samplePayload)) - - it("should yield decoded end-of-transmission frame and error") { - createInstance().decode(input).test() - .expectNextMatches { it is EndOfTransmissionMessage } - .verifyError(WireFrameException::class.java) - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - given("valid input with garbage after it") { val input = Unpooled.buffer() - .writeBytes(encoder.encode(PayloadWireFrameMessage(samplePayload))) + .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) .writeBytes(Unpooled.wrappedBuffer(samplePayload)) it("should yield decoded input frame and error") { createInstance().decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyError(WireFrameException::class.java) } @@ -159,16 +140,16 @@ internal object WireChunkDecoderTest : Spek({ } given("two inputs containing two separate messages") { - val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val input2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + val input1 = encoder.encode(WireFrameMessage(samplePayload)) + val input2 = encoder.encode(WireFrameMessage(anotherPayload)) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } .verifyComplete() } @@ -177,57 +158,15 @@ internal object WireChunkDecoderTest : Spek({ } } - given("two payload messages followed by end-of-transmission marker byte") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) - - val input = Unpooled.buffer() - .writeBytes(frame1) - .writeBytes(frame2) - .writeByte(0xAA) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } - .expectNextMatches { it is EndOfTransmissionMessage } - .verifyComplete() - } - } - - given("two payload messages separated by end-of-transmission marker byte") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) - - val input = Unpooled.buffer() - .writeBytes(frame1) - .writeByte(0xAA) - .writeBytes(frame2) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } - .expectNextMatches { it is EndOfTransmissionMessage } - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input) - } - } - given("1st input containing 1st frame and 2nd input containing garbage") { - val input1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) + val input1 = encoder.encode(WireFrameMessage(samplePayload)) val input2 = Unpooled.wrappedBuffer(anotherPayload) it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1) .test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() .verifyError(WireFrameException::class.java) @@ -244,8 +183,8 @@ internal object WireChunkDecoderTest : Spek({ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + val frame1 = encoder.encode(WireFrameMessage(samplePayload)) + val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1) @@ -255,10 +194,10 @@ internal object WireChunkDecoderTest : Spek({ it("should yield decoded input frames") { val cut = createInstance() cut.decode(input1).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() cut.decode(input2).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } .verifyComplete() } @@ -268,8 +207,8 @@ internal object WireChunkDecoderTest : Spek({ } given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { - val frame1 = encoder.encode(PayloadWireFrameMessage(samplePayload)) - val frame2 = encoder.encode(PayloadWireFrameMessage(anotherPayload)) + val frame1 = encoder.encode(WireFrameMessage(samplePayload)) + val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1, 5) @@ -282,8 +221,8 @@ internal object WireChunkDecoderTest : Spek({ cut.decode(input1).test() .verifyComplete() cut.decode(input2).test() - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == samplePayload.size } - .expectNextMatches { castToPayloadMsgOrFail(it).payloadSize == anotherPayload.size } + .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } .verifyComplete() } @@ -292,16 +231,4 @@ internal object WireChunkDecoderTest : Spek({ } } } -}) - - -private fun castToPayloadMsgOrFail(msg: WireFrameMessage): PayloadWireFrameMessage = - if (msg is PayloadWireFrameMessage) { - msg - } else { - fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${msg.javaClass}") - } - -private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = - this as? EndOfTransmissionMessage - ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}")
\ No newline at end of file +})
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 60e10ee0..3eba9b6b 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -37,7 +37,6 @@ import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithDifferentRouting import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration -import org.onap.dcae.collectors.veshv.tests.utils.endOfTransmissionWireMessage import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload @@ -66,30 +65,6 @@ object VesHvSpecification : Spek({ .describedAs("should send all events") .hasSize(2) } - - it("should not handle messages received from client after end-of-transmission message") { - val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(HVMEAS) - val anotherValidMessage = vesWireFrameMessage(HVMEAS) - val endOfTransmissionMessage = endOfTransmissionWireMessage() - - val handledEvents = sut.handleConnection(sink, - validMessage, - endOfTransmissionMessage, - anotherValidMessage - ) - - assertThat(handledEvents).hasSize(1) - assertThat(validMessage.refCnt()) - .describedAs("first message should be released") - .isEqualTo(0) - assertThat(endOfTransmissionMessage.refCnt()) - .describedAs("end-of-transmission message should be released") - .isEqualTo(0) - assertThat(anotherValidMessage.refCnt()) - .describedAs("second (not handled) message should not be released") - .isEqualTo(1) - } } describe("Memory management") { @@ -116,26 +91,6 @@ object VesHvSpecification : Spek({ .isEqualTo(expectedRefCnt) } - it("should release memory for end-of-transmission message") { - val (sut, sink) = vesHvWithStoringSink() - val validMessage = vesWireFrameMessage(HVMEAS) - val endOfTransmissionMessage = endOfTransmissionWireMessage() - val expectedRefCnt = 0 - - val handledEvents = sut.handleConnection(sink, - validMessage, - endOfTransmissionMessage - ) - - assertThat(handledEvents).hasSize(1) - assertThat(validMessage.refCnt()) - .describedAs("handled message should be released") - .isEqualTo(expectedRefCnt) - assertThat(endOfTransmissionMessage.refCnt()) - .describedAs("end-of-transmission message should be released") - .isEqualTo(expectedRefCnt) - } - it("should release memory for each message with invalid payload") { val (sut, sink) = vesHvWithStoringSink() val validMessage = vesWireFrameMessage(HVMEAS) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 51f94cc4..38de5370 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -24,7 +24,7 @@ import arrow.effects.fix import arrow.effects.monadError import arrow.typeclasses.bindingCatch import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters @@ -77,7 +77,7 @@ class MessageStreamValidation( private fun generateEvents(parameters: List<MessageParameters>): IO<List<VesEventOuterClass.VesEvent>> = messageGenerator.createMessageFlux(parameters) - .map(PayloadWireFrameMessage::payload) + .map(WireFrameMessage::payload) .map(ByteData::unsafeAsArray) .map(VesEventOuterClass.VesEvent::parseFrom) .collectList() diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index 017360bb..34ec8f5a 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -179,6 +179,6 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P return VesEvent.newBuilder() .setCommonEventHeader(CommonEventHeader.newBuilder() .setEventId(eventId)) - .setHvMeasFields(ByteString.copyFrom(payload.toByteArray())) + .setEventFields(ByteString.copyFrom(payload.toByteArray())) .build() } diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index beef26b6..05fdd808 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -31,7 +31,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.mockito.ArgumentMatchers.anyList -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser @@ -89,7 +89,7 @@ internal class MessageStreamValidationTest : Spek({ // given val jsonAsStream = sampleJsonAsStream() val event = vesEvent() - val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray()) + val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) val receivedMessageBytes = event.toByteArray() givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.VALID, 1)) @@ -107,7 +107,7 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent(payload = "payload A") val receivedEvent = vesEvent(payload = "payload B") - val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) val receivedMessageBytes = receivedEvent.toByteArray() givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) @@ -125,7 +125,7 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent() val receivedEvent = vesEvent(eventId = "bbb") - val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) val receivedMessageBytes = receivedEvent.toByteArray() givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.VALID, 1)) @@ -144,7 +144,7 @@ internal class MessageStreamValidationTest : Spek({ // given val jsonAsStream = sampleJsonAsStream() val event = vesEvent() - val generatedWireProtocolFrame = PayloadWireFrameMessage(event.toByteArray()) + val generatedWireProtocolFrame = WireFrameMessage(event.toByteArray()) val receivedMessageBytes = event.toByteArray() givenParsedMessageParameters(MessageParameters(event.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) @@ -162,7 +162,7 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent(payload = "payload A") val receivedEvent = vesEvent(payload = "payload B") - val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) val receivedMessageBytes = receivedEvent.toByteArray() givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) @@ -180,7 +180,7 @@ internal class MessageStreamValidationTest : Spek({ val jsonAsStream = sampleJsonAsStream() val generatedEvent = vesEvent() val receivedEvent = vesEvent("bbb") - val generatedWireProtocolFrame = PayloadWireFrameMessage(generatedEvent.toByteArray()) + val generatedWireProtocolFrame = WireFrameMessage(generatedEvent.toByteArray()) val receivedMessageBytes = receivedEvent.toByteArray() givenParsedMessageParameters(MessageParameters(generatedEvent.commonEventHeader, MessageType.FIXED_PAYLOAD, 1)) @@ -205,7 +205,7 @@ private fun vesEvent(eventId: String = DUMMY_EVENT_ID, payload: String = DUMMY_P return VesEvent.newBuilder() .setCommonEventHeader(CommonEventHeader.newBuilder() .setEventId(eventId)) - .setHvMeasFields(ByteString.copyFrom(payload.toByteArray())) + .setEventFields(ByteString.copyFrom(payload.toByteArray())) .build() } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index c61ab266..4f867f13 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,8 +24,8 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -33,19 +33,19 @@ import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.R */ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - fun encode(frame: PayloadWireFrameMessage): ByteBuf { - val bb = allocator.buffer(PayloadWireFrameMessage.HEADER_SIZE + frame.payload.size()) - - bb.writeByte(PayloadWireFrameMessage.MARKER_BYTE.toInt()) - bb.writeByte(frame.versionMajor.toInt()) - bb.writeByte(frame.versionMinor.toInt()) - bb.writeZero(RESERVED_BYTE_COUNT) - bb.writeByte(frame.payloadTypeRaw.toInt()) - bb.writeInt(frame.payloadSize) - frame.payload.writeTo(bb) - - return bb - } + fun encode(frame: WireFrameMessage): ByteBuf = allocator + .buffer(WireFrameMessage.HEADER_SIZE + frame.payload.size()) + .run { + writeByte(WireFrameMessage.MARKER_BYTE.toInt()) + writeByte(frame.versionMajor.toInt()) + writeByte(frame.versionMinor.toInt()) + writeZero(RESERVED_BYTE_COUNT) + writeByte(frame.payloadType.toInt()) + writeInt(frame.payloadSize) + } + .also { + frame.payload.writeTo(it) + } } /** @@ -57,36 +57,20 @@ class WireFrameDecoder { fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> = when { isEmpty(byteBuf) -> Left(EmptyWireFrame) - isSingleByte(byteBuf) -> lookForEOTFrame(byteBuf) headerDoesNotFit(byteBuf) -> Left(MissingWireFrameHeaderBytes) else -> parseWireFrame(byteBuf) } private fun isEmpty(byteBuf: ByteBuf) = byteBuf.readableBytes() < 1 - private fun isSingleByte(byteBuf: ByteBuf) = byteBuf.readableBytes() == 1 - - private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < PayloadWireFrameMessage.HEADER_SIZE - - private fun lookForEOTFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, EndOfTransmissionMessage> { - byteBuf.markReaderIndex() - val byte = byteBuf.readUnsignedByte() - - return if (byte == EndOfTransmissionMessage.MARKER_BYTE) { - Right(EndOfTransmissionMessage) - } else { - byteBuf.resetReaderIndex() - Left(MissingWireFrameHeaderBytes) - } - } + private fun headerDoesNotFit(byteBuf: ByteBuf) = byteBuf.readableBytes() < WireFrameMessage.HEADER_SIZE private fun parseWireFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { byteBuf.markReaderIndex() val mark = byteBuf.readUnsignedByte() return when (mark) { - EndOfTransmissionMessage.MARKER_BYTE -> Right(EndOfTransmissionMessage) - PayloadWireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) + WireFrameMessage.MARKER_BYTE -> parsePayloadFrame(byteBuf) else -> { byteBuf.resetReaderIndex() Left(InvalidWireFrameMarker(mark)) @@ -94,7 +78,7 @@ class WireFrameDecoder { } } - private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, PayloadWireFrameMessage> { + private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { val versionMajor = byteBuf.readUnsignedByte() val versionMinor = byteBuf.readUnsignedByte() byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved @@ -113,7 +97,7 @@ class WireFrameDecoder { val payload = ByteData.readFrom(byteBuf, payloadSize) - return Right(PayloadWireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) + return Right(WireFrameMessage(payload, versionMajor, versionMinor, payloadTypeRaw, payloadSize)) } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index d82bb25f..dfadc5b8 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,7 +35,7 @@ sealed class InvalidWireFrame(msg: String) : WireFrameDecodingError(msg) class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( "Invalid start of frame. Expected 0x%02X, but was 0x%02X" - .format(PayloadWireFrameMessage.MARKER_BYTE, actualMarker) + .format(WireFrameMessage.MARKER_BYTE, actualMarker) ) object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") @@ -47,9 +47,3 @@ sealed class MissingWireFrameBytes(msg: String) : WireFrameDecodingError(msg) object MissingWireFrameHeaderBytes : MissingWireFrameBytes("readable bytes < header size") object MissingWireFramePayloadBytes : MissingWireFrameBytes("readable bytes < payload size") object EmptyWireFrame : MissingWireFrameBytes("empty wire frame") - - -// Other - -class UnknownWireFrameTypeException(frame: WireFrameMessage) - : Throwable("Unexpected wire frame message type: ${frame.javaClass}") diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt index 642179e1..06ca9383 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -20,10 +20,8 @@ package org.onap.dcae.collectors.veshv.domain -sealed class WireFrameMessage - /** - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). + * Wire frame structure is presented bellow using ASN.1 notation. All fields are in network byte order (big-endian). * * ``` * -- Precedes every HV-VES message @@ -31,21 +29,22 @@ sealed class WireFrameMessage * magic INTEGER (0..255), – always 0xFF, identifies extended header usage * versionMajor INTEGER (0..255), – major interface v, forward incompatible with previous major v * versionMinor INTEGER (0..255), – minor interface v, forward compatible with previous minor v - * reserved BIT STRING (SIZE (16)), – reserved for future use - * messageType INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf - * messageLength INTEGER (0..4294967295) – message payload length + * reserved OCTET STRING (SIZE (3)), – reserved for future use + * payloadId INTEGER (0..255), – message payload type: 0x00=undefined, 0x01=protobuf + * payloadLength INTEGER (0..4294967295) – message payload length + * payload OCTET STRING – length as per payloadLength * } * ``` * * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class PayloadWireFrameMessage(val payload: ByteData, - val versionMajor: Short, - val versionMinor: Short, - val payloadTypeRaw: Short, - val payloadSize: Int -) : WireFrameMessage() { +data class WireFrameMessage(val payload: ByteData, + val versionMajor: Short, + val versionMinor: Short, + val payloadType: Short, + val payloadSize: Int +) { constructor(payload: ByteArray) : this( ByteData(payload), SUPPORTED_VERSION_MAJOR, @@ -55,7 +54,7 @@ data class PayloadWireFrameMessage(val payload: ByteData, fun isValid(): Boolean = versionMajor == SUPPORTED_VERSION_MAJOR - && PayloadContentType.isValidHexValue(payloadTypeRaw) + && PayloadContentType.isValidHexValue(payloadType) && payload.size() == payloadSize companion object { @@ -74,24 +73,3 @@ data class PayloadWireFrameMessage(val payload: ByteData, const val MAX_PAYLOAD_SIZE = 1024 * 1024 } } - - -/** - * This message type should be used by client to indicate that he has finished sending data to collector. - * - * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). - * - * ``` - * -- Sent by the HV-VES data provider, prior to closing the connection to the HV-VES destination - * Eot ::= SEQUENCE { - * magic INTEGER (0..255), – always 0xAA - * } - * ``` - * - * @since July 2018 - */ - -object EndOfTransmissionMessage : WireFrameMessage() { - const val MARKER_BYTE: Short = 0xAA -} - diff --git a/hv-collector-domain/src/main/proto/event/VesEvent.proto b/hv-collector-domain/src/main/proto/event/VesEvent.proto index 54a6d149..0f9e5e1f 100644 --- a/hv-collector-domain/src/main/proto/event/VesEvent.proto +++ b/hv-collector-domain/src/main/proto/event/VesEvent.proto @@ -21,54 +21,55 @@ syntax = "proto3"; package org.onap.ves; message VesEvent { - CommonEventHeader commonEventHeader = 1; // required + CommonEventHeader commonEventHeader=1; // required - oneof eventFields // required, payload - { - // each new high-volume domain can add an entry for its own GPB message - // the field can be opaque (bytes) to allow decoding the payload in a separate step - bytes hvMeasFields = 2; // for domain==HVMEAS, GPB message: HVMeasFields - } + bytes eventFields=2; // required, payload + // this field contains a domain-specific GPB message + // the field being opaque (bytes), the decoding of the payload occurs in a separate step + // the name of the GPB message for domain XYZ is XYZFields + // e.g. for domain==HVMEAS, the GPB message is HVMEASFields } // VES CommonEventHeader adapted to GPB (Google Protocol Buffers) -// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain +// Aligned with VES 7.0.1 schema, and extending to hvMeas Domain. -message CommonEventHeader { - string version = 1; // required, "version of the gpb common event header" - string domain = 2; // required, "the eventing domain associated with the event", allowed values: - // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING, - // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS +message CommonEventHeader +{ + string version = 1; // required, "version of the gpb common event header" + string domain = 2; // required, "the eventing domain associated with the event", allowed values: + // FAULT, HEARTBEAT, MEASUREMENT, MOBILE_FLOW, OTHER, PNFREGISTRATION, SIP_SIGNALING, + // STATE_CHANGE, SYSLOG, THRESHOLD_CROSSING_ALERT, VOICE_QUALITY, HVMEAS - uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" + uint32 sequence = 3; // required, "ordering of events communicated by an event source instance or 0 if not needed" - enum Priority { + enum Priority + { PRIORITY_NOT_PROVIDED = 0; HIGH = 1; MEDIUM = 2; NORMAL = 3; LOW = 4; } - Priority priority = 4; // required, "processing priority" + Priority priority = 4; // required, "processing priority" - string eventId = 5; // required, "event key that is unique to the event source" - string eventName = 6; // required, "unique event name" - string eventType = 7; // "for example - guest05, platform" + string eventId = 5; // required, "event key that is unique to the event source" + string eventName = 6; // required, "unique event name" + string eventType = 7; // "for example - guest05, platform" - uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 lastEpochMicrosec = 8; // required, "the latest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" + uint64 startEpochMicrosec = 9; // required, "the earliest unix time aka epoch time associated with the event from any component--as microseconds elapsed since 1 Jan 1970 not including leap seconds" - string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" - string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" - string nfVendorName = 12; // " Vendor Name providing the nf " + string nfNamingCode = 10; // "4 character network function type, aligned with vnf naming standards" + string nfcNamingCode = 11; // "3 character network function component type, aligned with vfc naming standards" + string nfVendorName = 12; // " Vendor Name providing the nf " - bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" - string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" - bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" - string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" - string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" - string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener" + bytes reportingEntityId = 13; // "UUID identifying the entity reporting the event, for example an OAM VM; must be populated by the ATT enrichment process" + string reportingEntityName = 14; // required, "name of the entity reporting the event, for example, an EMS name; may be the same as sourceName should match A&AI entry" + bytes sourceId = 15; // "UUID identifying the entity experiencing the event issue; must be populated by the ATT enrichment process" + string sourceName = 16; // required, "name of the entity experiencing the event issued use A&AI entry" + string timeZoneOffset = 17; // "Offset to GMT to indicate local time zone for the device" + string vesEventListenerVersion = 18; // required, "Version of the VesEvent Listener" - reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" + reserved "InternalHeaderFields"; // "enrichment fields for internal VES Event Listener service use only, not supplied by event sources" reserved 100; } diff --git a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto index 9a8582d5..94b40106 100644 --- a/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto +++ b/hv-collector-domain/src/main/proto/measurements/HVMeasFields.proto @@ -24,20 +24,14 @@ import "MeasDataCollection.proto"; // for 3GPP PM format message HVMeasFields
{
string hvMeasFieldsVersion = 1;
- measDataCollection.MeasDataCollection measDataCollection = 2;
- // From 3GPP TS 28.550
+ MeasDataCollection measDataCollection = 2;
+ // Based on 3GPP TS 28.550
// Informative: mapping between similar header fields (format may be different)
- // 3GPP MeasStreamHeader ONAP/VES CommonEventHeader
+ // 3GPP MeasHeader ONAP/VES CommonEventHeader
// senderName sourceName
// senderType nfNamingCode + nfcNamingCode
// vendorName nfVendorName
// collectionBeginTime startEpochMicrosec
// timestamp lastEpochMicrosec
- repeated HashMap eventAddlFlds = 3; // optional per-event data
+ map<string, string> eventAddlFlds = 3; // optional per-event data (name/value HashMap)
}
-
-message HashMap
-{
- string name = 1;
- string value = 2;
-}
\ No newline at end of file diff --git a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto index 472dcc43..31f4dfb1 100644 --- a/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto +++ b/hv-collector-domain/src/main/proto/measurements/MeasDataCollection.proto @@ -18,87 +18,65 @@ * ============LICENSE_END=========================================================
*/
syntax = "proto3";
-package measDataCollection;
+package org.onap.ves;
-// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V1.2.2 (2018-08).
+// Definition for RTPM, structure aligned with 3GPP PM format optimized for RTPM delivery pre-standard TS 28.550 V2.0.0 (2018-09).
// Some field details are taken from 3GPP TS 32.436 V15.0.0 (2018-06) ASN.1 file.
-// Note (2018-08): work is in progress for 3GPP TS 28.550 to specify PM streaming format. Changes will be made, if needed, to align with final version.
+// Note (2018-09): work is in progress for 3GPP TS 28.550. Changes will be made, if needed, to align with final version.
// Differences/additions to 3GPP TS 28.550 are marked with "%%".
-message MeasDataCollection // top-level message
+message MeasDataCollection // top-level message
{
- MeasHeader measHeader = 1;
- repeated MeasData measData = 2; // %%: use a single instance for RTPM
- MeasFooter measFooter = 3;
-}
-
-message MeasHeader
-{
- string streamFormatVersion = 1;
+ // %% Combined messageFileHeader, measData (single instance), messageFileFooter (not needed: timestamp = collectionBeginTime + granularityPeriod).
+ string formatVersion = 1;
string senderName = 2;
string senderType = 3;
string vendorName = 4;
string collectionBeginTime = 5; // in ASN.1 GeneralizedTime format (subset of ISO 8601 basic format)
+ uint32 granularityPeriod = 6; // duration in seconds, %% moved from MeasInfo (single reporting period per event)
+ string measuredEntityUserName = 7; // network function user definable name ("userLabel") defined for the measured entity in 3GPP TS 28.622
+ string measuredEntityDn = 8; // DN as per 3GPP TS 32.300
+ string measuredEntitySoftwareVersion = 9;
+ repeated string measObjInstIdList = 10; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
+ repeated MeasInfo measInfo = 11;
}
-message MeasData
-{
- string measuredEntityId = 1; // DN as per 3GPP TS 32.300
- string measuredEntityUserName = 2; // network function User Name
- string measuredEntitySoftwareVersion = 3;
- uint32 granularityPeriod = 4; // in seconds, %% moved from MeasInfo (single reporting period per event)
- repeated string measObjInstIdList = 5; // %%: optional, monitored object LDNs as per 3GPP TS 32.300 and 3GPP TS 32.432
- repeated MeasInfo measInfo = 6;
-}
-
-
message MeasInfo
{
oneof MeasInfoId { // measurement group identifier
- uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
- string measInfoId = 2; // identifier as string (more generic)
+ uint32 iMeasInfoId = 1; // identifier as integer (%%: more compact)
+ string measInfoId = 2; // identifier as string (more generic)
}
oneof MeasTypes { // measurement identifiers associated with the measurement results
- IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
- SMeasTypes measTypes = 4; // identifiers as strings (more generic)
+ IMeasTypes iMeasTypes = 3; // identifiers as integers (%%: more compact)
+ SMeasTypes measTypes = 4; // identifiers as strings (more generic)
}
// Needed only because GPB does not support repeated fields directly inside 'oneof'
message IMeasTypes { repeated uint32 iMeasType = 1; }
message SMeasTypes { repeated string measType = 1; }
- string jobIdList = 5;
- repeated MeasValue measValues = 6; // performance measurements grouped by measurement groups
+ string jobId = 5;
+ repeated MeasValue measValues = 6; // performance measurements grouped by measurement object
}
message MeasValue
{
oneof MeasObjInstId { // monitored object LDN as per 3GPP TS 32.300 and 3GPP TS 32.432
- string measObjInstId = 1; // LDN itself
- uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
+ string measObjInstId = 1; // LDN itself
+ uint32 measObjInstIdListIdx = 2; // %%: index into measObjInstIdList
}
repeated MeasResult measResults = 3;
bool suspectFlag = 4;
- repeated nameValue measObjAddlFlds = 5; // %%: optional per-object data
+ map<string, string> measObjAddlFlds = 5; // %%: optional per-object data (name/value HashMap)
}
message MeasResult
{
- uint32 p = 1; // Optional index in the MeasTypes array
+ uint32 p = 1; // Index in the MeasTypes array, needed only if measResults has fewer elements than MeasTypes
oneof xValue {
sint64 iValue = 2;
double rValue = 3;
bool isNull = 4;
}
}
-
-message MeasFooter
-{
- string timestamp = 1; // in ASN.1 GeneralizedTime format, a better name would be "collectionEndTime"
-}
-
-message nameValue // %%: vendor-defined name-value pair
-{
- string name = 1;
- string value = 2;
-}
\ No newline at end of file diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index b992d530..988789d2 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -28,7 +28,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -42,8 +42,7 @@ object WireFrameCodecsTest : Spek({ val encoder = WireFrameEncoder() val decoder = WireFrameDecoder() - fun createSampleFrame() = - PayloadWireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) + fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) fun encodeSampleFrame() = createSampleFrame().let { @@ -53,11 +52,11 @@ object WireFrameCodecsTest : Spek({ describe("Wire Frame invariants") { given("input with unsupported major version") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData.EMPTY, versionMajor = 100, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 0) it("should fail validation") { @@ -66,11 +65,11 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported minor version") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData.EMPTY, versionMajor = 1, versionMinor = 6, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 0) it("should pass validation") { @@ -79,11 +78,11 @@ object WireFrameCodecsTest : Spek({ } given("input with unsupported payload type") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData.EMPTY, versionMajor = 1, versionMinor = 0, - payloadTypeRaw = 0x69, + payloadType = 0x69, payloadSize = 0) it("should fail validation") { @@ -92,11 +91,11 @@ object WireFrameCodecsTest : Spek({ } given("input with too small payload size") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 1) it("should fail validation") { @@ -105,11 +104,11 @@ object WireFrameCodecsTest : Spek({ } given("input with too big payload size") { - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(byteArrayOf(1, 2, 3)), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = 8) it("should fail validation") { @@ -119,11 +118,11 @@ object WireFrameCodecsTest : Spek({ given("valid input") { val payload = byteArrayOf(6, 9, 8, 6) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) it("should pass validation") { @@ -139,14 +138,18 @@ object WireFrameCodecsTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = decoder.decodeFirst(encoded).getPayloadMessageOrFail() + val decoded = decoder.decodeFirst(encoded).getMessageOrFail() - it("should decode version") { + it("should decode major version") { assertThat(decoded.versionMajor).isEqualTo(frame.versionMajor) } + it("should decode minor version") { + assertThat(decoded.versionMinor).isEqualTo(frame.versionMinor) + } + it("should decode payload type") { - assertThat(decoded.payloadTypeRaw).isEqualTo(frame.payloadTypeRaw) + assertThat(decoded.payloadType).isEqualTo(frame.payloadType) } it("should decode payload size") { @@ -170,14 +173,7 @@ object WireFrameCodecsTest : Spek({ assertBufferIntact(buff) } - it("should return end-of-transmission message when given end-of-transmission marker byte") { - val buff = Unpooled.buffer() - .writeByte(0xAA) - - assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) - } - - it("should return error when given any single byte other than end-of-transmission marker byte") { + it("should return error when given any single byte other than marker byte") { val buff = Unpooled.buffer() .writeByte(0xEE) @@ -194,7 +190,7 @@ object WireFrameCodecsTest : Spek({ assertBufferIntact(buff) } - it("should return error when length looks ok but first byte is not 0xFF or 0xAA") { + it("should return error when length looks ok but first byte is not 0xFF") { val buff = Unpooled.buffer() .writeByte(0x69) .writeBytes("some garbage".toByteArray()) @@ -203,14 +199,6 @@ object WireFrameCodecsTest : Spek({ assertBufferIntact(buff) } - it("should return end-of-transmission message when length looks ok and first byte is 0xAA") { - val buff = Unpooled.buffer() - .writeByte(0xAA) - .writeBytes("some garbage".toByteArray()) - - assertIsEndOfTransmissionMessage(decoder.decodeFirst(buff)) - } - it("should return error when payload doesn't fit") { val buff = Unpooled.buffer() .writeBytes(encodeSampleFrame()) @@ -223,8 +211,8 @@ object WireFrameCodecsTest : Spek({ it("should decode payload message leaving rest unread") { val buff = Unpooled.buffer() .writeBytes(encodeSampleFrame()) - .writeByte(0xAA) - val decoded = decoder.decodeFirst(buff).getPayloadMessageOrFail() + .writeByte(0xAB) + val decoded = decoder.decodeFirst(buff).getMessageOrFail() assertThat(decoded.isValid()).describedAs("should be valid").isTrue() assertThat(buff.readableBytes()).isEqualTo(1) @@ -236,11 +224,11 @@ object WireFrameCodecsTest : Spek({ it("should decode successfully when payload size is equal 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) @@ -250,11 +238,11 @@ object WireFrameCodecsTest : Spek({ it("should return error when payload exceeds 1 MiB") { val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) val buff = encoder.encode(input) @@ -266,11 +254,11 @@ object WireFrameCodecsTest : Spek({ it("should validate only first message") { val payload = ByteArray(MAX_PAYLOAD_SIZE) - val input = PayloadWireFrameMessage( + val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, versionMinor = 0, - payloadTypeRaw = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, payloadSize = payload.size) @@ -289,21 +277,6 @@ private fun <A, B> Either<A, B>.assertFailedWithError(assertj: (ObjectAssert<A>) fold({ assertj(assertThat(it)) }, { fail("Error expected") }) } -private fun Either<WireFrameDecodingError, WireFrameMessage>.getPayloadMessageOrFail(): PayloadWireFrameMessage = - fold({ fail(it.message) }, { it.castToPayloadMsgOrFail() }) - -private fun WireFrameMessage.castToPayloadMsgOrFail(): PayloadWireFrameMessage = - this as? PayloadWireFrameMessage - ?: fail("Decoded message had unexpected type, expecting: PayloadWireFrameMessage, but was: ${this.javaClass}") - - -private fun assertIsEndOfTransmissionMessage(decoded: Either<WireFrameDecodingError, WireFrameMessage>) { - decoded.getEndOfTransmissionMessageOrFail() -} - -private fun Either<WireFrameDecodingError, WireFrameMessage>.getEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = - fold({ fail(it.message) }, { it.castToEndOfTransmissionMessageOrFail() }) +private fun Either<WireFrameDecodingError, WireFrameMessage>.getMessageOrFail(): WireFrameMessage = + fold({ fail(it.message) }, { it }) -private fun WireFrameMessage.castToEndOfTransmissionMessageOrFail(): EndOfTransmissionMessage = - this as? EndOfTransmissionMessage - ?: fail("Decoded message had unexpected type, expecting: EndOfTransmissionMessage, but was: ${this.javaClass}") diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt index 78042260..85bdcab7 100644 --- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt @@ -23,8 +23,8 @@ import com.google.protobuf.ByteString import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.MAX_PAYLOAD_SIZE -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage.Companion.RESERVED_BYTE_COUNT +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER @@ -52,9 +52,6 @@ fun vesWireFrameMessage(domain: VesEventDomain = OTHER, writeBytes(gpb) // ves event as GPB bytes } -fun endOfTransmissionWireMessage(): ByteBuf = - allocator.buffer().writeByte(0xAA) - fun wireFrameMessageWithInvalidPayload(): ByteBuf = allocator.buffer().run { writeValidWireFrameHeaders() @@ -69,8 +66,8 @@ fun garbageFrame(): ByteBuf = allocator.buffer().run { fun invalidWireFrame(): ByteBuf = allocator.buffer().run { writeByte(0xFF) - writeByte(0x01) // version - writeByte(0x01) // content type = GPB + writeByte(0x01) // version major + writeByte(0x01) // version minor } fun vesMessageWithTooBigPayload(domain: VesEventDomain = HVMEAS): ByteBuf = diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index 0341c2ff..57b960af 100644 --- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -39,7 +39,7 @@ fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasFields: ByteString = ByteString.EMPTY): VesEventOuterClass.VesEvent = VesEventOuterClass.VesEvent.newBuilder() .setCommonEventHeader(commonEventHeader) - .setHvMeasFields(hvRanMeasFields) + .setEventFields(hvRanMeasFields) .build() fun commonHeader(domain: VesEventDomain = HVMEAS, diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt index d9329cb0..ace7f1cb 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.api -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator import reactor.core.publisher.Flux @@ -29,7 +29,7 @@ import reactor.core.publisher.Flux * @since June 2018 */ interface MessageGenerator { - fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> + fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> companion object { val INSTANCE: MessageGenerator by lazy { diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index 5d1f56dc..90e7770b 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.impl import com.google.protobuf.ByteString import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.domain.PayloadContentType -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType @@ -44,11 +44,11 @@ import java.nio.charset.Charset */ class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<PayloadWireFrameMessage> = Flux + override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux .fromIterable(messageParameters) .flatMap { createMessageFlux(it) } - private fun createMessageFlux(parameters: MessageParameters): Flux<PayloadWireFrameMessage> = + private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> = Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) } .let { if (parameters.amount < 0) @@ -57,17 +57,17 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa it.repeat(parameters.amount) } - private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): PayloadWireFrameMessage = + private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage = when (messageType) { VALID -> - PayloadWireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) + WireFrameMessage(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) TOO_BIG_PAYLOAD -> - PayloadWireFrameMessage(vesEvent(commonEventHeader, oversizedPayload())) + WireFrameMessage(vesEvent(commonEventHeader, oversizedPayload())) FIXED_PAYLOAD -> - PayloadWireFrameMessage(vesEvent(commonEventHeader, fixedPayload())) + WireFrameMessage(vesEvent(commonEventHeader, fixedPayload())) INVALID_WIRE_FRAME -> { val payload = ByteData(vesEvent(commonEventHeader, payloadGenerator.generatePayload())) - PayloadWireFrameMessage( + WireFrameMessage( payload, UNSUPPORTED_VERSION, UNSUPPORTED_VERSION, @@ -75,7 +75,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa payload.size()) } INVALID_GPB_DATA -> - PayloadWireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) + WireFrameMessage("invalid vesEvent".toByteArray(Charset.defaultCharset())) } private fun vesEvent(commonEventHeader: CommonEventHeader, hvRanMeasPayload: ByteString): ByteArray { @@ -85,11 +85,11 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa private fun createVesEvent(commonEventHeader: CommonEventHeader, payload: ByteString): VesEvent = VesEvent.newBuilder() .setCommonEventHeader(commonEventHeader) - .setHvMeasFields(payload) + .setEventFields(payload) .build() private fun oversizedPayload() = - payloadGenerator.generateRawPayload(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE + 1) + payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1) private fun fixedPayload() = payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE) diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt index ea3d094a..e380f931 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt @@ -29,7 +29,7 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HVMEAS import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT @@ -87,7 +87,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name) } .verifyComplete() @@ -105,7 +105,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name) } .verifyComplete() @@ -122,7 +122,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThatExceptionOfType(InvalidProtocolBufferException::class.java) .isThrownBy { extractCommonEventHeader(it.payload) } } @@ -140,9 +140,9 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isFalse() - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name) - assertThat(it.versionMajor).isNotEqualTo(PayloadWireFrameMessage.SUPPORTED_VERSION_MINOR) + assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR) } .verifyComplete() } @@ -158,7 +158,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractHvRanMeasFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name) } @@ -177,17 +177,17 @@ object MessageGeneratorImplTest : Spek({ generator.createMessageFlux(messageParameters) .test() .assertNext { - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HVMEAS.name) } .expectNextCount(singleFluxSize - 1) .assertNext { - assertThat(it.payloadSize).isGreaterThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.name) } .expectNextCount(singleFluxSize - 1) .assertNext { - assertThat(it.payloadSize).isLessThan(PayloadWireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.name) } .expectNextCount(singleFluxSize - 1) @@ -202,5 +202,5 @@ fun extractCommonEventHeader(bytes: ByteData): CommonEventHeader = fun extractHvRanMeasFields(bytes: ByteData): ByteString = - VesEvent.parseFrom(bytes.unsafeAsArray()).hvMeasFields + VesEvent.parseFrom(bytes.unsafeAsArray()).eventFields diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 558bd1c1..3fde2c7e 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -23,7 +23,6 @@ import arrow.core.Either import arrow.core.Some import arrow.core.Try import arrow.core.fix -import arrow.core.flatMap import arrow.core.monad import arrow.effects.IO import arrow.typeclasses.binding diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index d1a5296a..af71e9ce 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration @@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux<PayloadWireFrameMessage>) = + fun sendIo(messages: Flux<WireFrameMessage>) = sendRx(messages).then(Mono.just(Unit)).asIo() - private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { + private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<PayloadWireFrameMessage>, + messages: Flux<WireFrameMessage>, nettyOutbound: NettyOutbound): Publisher<Void> { val allocator = nettyOutbound.alloc() @@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { companion object { private val logger = Logger(VesHvClient::class) private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE } } diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 80f39579..97535887 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -30,7 +30,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteInputStream import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat @@ -98,7 +98,7 @@ internal class XnfSimulatorTest : Spek({ // given val json = "[true]".byteInputStream() val messageParams = listOf<MessageParameters>() - val generatedMessages = Flux.empty<PayloadWireFrameMessage>() + val generatedMessages = Flux.empty<WireFrameMessage>() val sendingIo = IO {} whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) |