From 03702b48989174dc8afa855e663a28e34b4da67b Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 28 Jun 2018 14:42:05 +0200 Subject: Use Either instead of exceptions in frame decoder Goals: * Make code cleaner (in a FP way) * Avoid costly exception throw each time we wait for the rest of the frame (collecting stack traces is costly and we do not need them anyway) Closes ONAP-437 Change-Id: I40341d3c2cb85f3ff581d89167245cb009dbb070 Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../tests/component/PerformanceSpecification.kt | 9 +-- .../veshv/tests/component/VesHvSpecification.kt | 4 +- .../collectors/veshv/tests/component/messages.kt | 80 ++++++++++++++++++++++ .../collectors/veshv/tests/component/spekUtils.kt | 38 ++++++++++ .../dcae/collectors/veshv/tests/component/utils.kt | 80 ---------------------- 5 files changed, 125 insertions(+), 86 deletions(-) create mode 100644 hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt create mode 100644 hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt delete mode 100644 hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt (limited to 'hv-collector-ct') diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index c68f0514..00739fa4 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -41,12 +41,13 @@ import java.time.Duration import java.util.* import kotlin.system.measureTimeMillis - /** * @author Piotr Jaszczyk * @since May 2018 */ object PerformanceSpecification : Spek({ + debugRx(false) + describe("VES High Volume Collector performance") { it("should handle multiple clients in reasonable time") { val sink = CountingSink() @@ -69,8 +70,8 @@ object PerformanceSpecification : Spek({ val durationSec = durationMs / 1000.0 val throughput = sink.count / durationSec - println("Processed $runs connections each containing $numMessages msgs.") - println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") + logger.info("Processed $runs connections each containing $numMessages msgs.") + logger.info("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s") assertThat(sink.count) .describedAs("should send all events") .isEqualTo(runs * numMessages) @@ -94,7 +95,7 @@ object PerformanceSpecification : Spek({ .timeout(timeout) .block() - println("Forwarded ${sink.count} msgs") + logger.info("Forwarded ${sink.count} msgs") assertThat(sink.count) .describedAs("should send up to number of events") .isLessThan(numMessages) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 08450598..49eeddaa 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -19,12 +19,10 @@ */ package org.onap.dcae.collectors.veshv.tests.component -import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration @@ -35,6 +33,8 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain * @since May 2018 */ object VesHvSpecification : Spek({ + debugRx(false) + describe("VES High Volume Collector") { it("should handle multiple HV RAN events") { val sink = StoringSink() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt new file mode 100644 index 00000000..8895d642 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/messages.kt @@ -0,0 +1,80 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.PooledByteBufAllocator +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import java.util.* + +val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT + +fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = allocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(0x01) // version + writeByte(0x01) // content type = GPB + + val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes +} + + +fun invalidVesMessage() = allocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(0x01) // version + writeByte(0x01) // content type = GPB + + val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) + writeInt(invalidGpb.size) // ves event size in bytes + writeBytes(invalidGpb) + +} + +fun garbageFrame() = allocator.buffer().run { + writeBytes("the meaning of life is &@)(*_!".toByteArray()) +} + +fun invalidWireFrame() = allocator.buffer().run { + writeByte(0xFF) + writeByte(0x01) // version + writeByte(0x01) // content type = GPB +} + +fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) = + VesEvent.newBuilder() + .setCommonEventHeader( + CommonEventHeader.getDefaultInstance().toBuilder() + .setVersion("1.0") + .setEventName("xyz") + .setEventId(id) + .setDomain(domain) + .setEventName("Sample event name") + .setSourceName("Sample Source") + .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) + .setPriority(CommonEventHeader.Priority.MEDIUM) + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034459) + .setSequence(1)) + .setHvRanMeasFields(ByteString.EMPTY) + .build() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt new file mode 100644 index 00000000..29df8c70 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spekUtils.kt @@ -0,0 +1,38 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.jetbrains.spek.api.dsl.SpecBody +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Hooks + +fun SpecBody.debugRx(debug: Boolean = true) { + if (debug) { + beforeGroup { + Hooks.onOperatorDebug() + } + + afterGroup { + Hooks.resetOnOperatorDebug() + } + } +} + +val logger = Logger("org.onap.dcae.collectors.veshv.tests.component") diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt deleted file mode 100644 index 8895d642..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.component - -import com.google.protobuf.ByteString -import io.netty.buffer.ByteBufAllocator -import io.netty.buffer.PooledByteBufAllocator -import org.onap.ves.VesEventV5.VesEvent -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import java.util.* - -val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT - -fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB - - val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() - writeInt(gpb.limit()) // ves event size in bytes - writeBytes(gpb) // ves event as GPB bytes -} - - -fun invalidVesMessage() = allocator.buffer().run { - writeByte(0xFF) // always 0xFF - writeByte(0x01) // version - writeByte(0x01) // content type = GPB - - val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) - writeInt(invalidGpb.size) // ves event size in bytes - writeBytes(invalidGpb) - -} - -fun garbageFrame() = allocator.buffer().run { - writeBytes("the meaning of life is &@)(*_!".toByteArray()) -} - -fun invalidWireFrame() = allocator.buffer().run { - writeByte(0xFF) - writeByte(0x01) // version - writeByte(0x01) // content type = GPB -} - -fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) = - VesEvent.newBuilder() - .setCommonEventHeader( - CommonEventHeader.getDefaultInstance().toBuilder() - .setVersion("1.0") - .setEventName("xyz") - .setEventId(id) - .setDomain(domain) - .setEventName("Sample event name") - .setSourceName("Sample Source") - .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) - .setPriority(CommonEventHeader.Priority.MEDIUM) - .setStartEpochMicrosec(120034455) - .setLastEpochMicrosec(120034459) - .setSequence(1)) - .setHvRanMeasFields(ByteString.EMPTY) - .build() -- cgit 1.2.3-korg