diff options
Diffstat (limited to 'hv-collector-ct/src/test/kotlin')
6 files changed, 351 insertions, 0 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt new file mode 100644 index 00000000..f8eefa5a --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.boundary.Collector +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink +import reactor.core.publisher.Flux +import java.time.Duration + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class Sut { + val configurationProvider = FakeConfigurationProvider() + val sink = FakeSink() + private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink)) + val collectorProvider = collectorFactory.createVesHvCollectorProvider() + + val collector: Collector + get() = collectorProvider() + + fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> { + collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + + return sink.sentMessages + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt new file mode 100644 index 00000000..2cfb785e --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -0,0 +1,94 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object VesHvSpecification : Spek({ + describe("VES High Volume Collector") { + system("should handle multiple HV RAN events") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS)) + + assertThat(messages) + .describedAs("should send all events") + .hasSize(2) + } + + system("should release memory for each incoming message") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val msgWithInvalidDomain = vesMessage(Domain.OTHER) + val msgWithInvalidPayload = invalidVesMessage() + val msgWithInvalidFrame = invalidWireFrame() + val validMessage = vesMessage(Domain.HVRANMEAS) + + sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage) + + assertThat(msgWithInvalidDomain.refCnt()) + .describedAs("message with invalid domain should be released") + .isEqualTo(0) + assertThat(msgWithInvalidPayload.refCnt()) + .describedAs("message with invalid payload should be released") + .isEqualTo(0) + assertThat(msgWithInvalidFrame.refCnt()) + .describedAs("message with invalid frame should be released") + .isEqualTo(0) + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(0) + } + } + + describe("message routing") { + system("should direct message to a topic by means of routing configuration") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + + val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS)) + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1) + } + + system("should drop message if route was not found") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val messages = sut.handleConnection( + vesMessage(Domain.OTHER, "first"), + vesMessage(Domain.HVRANMEAS, "second"), + vesMessage(Domain.HEARTBEAT, "third")) + + assertThat(messages).describedAs("number of routed messages").hasSize(1) + + val msg = messages[0] + assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) + assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second") + } + } +}) diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt new file mode 100644 index 00000000..c30ba7e1 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import org.jetbrains.spek.api.dsl.Pending +import org.jetbrains.spek.api.dsl.TestContainer + +internal fun TestContainer.system(description: String, body: (Sut) -> Unit) { + test("system $description", body = { body(Sut()) }) +} + +internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) { + test("system $description", Pending.Yes(reason), body = { body(Sut()) }) +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt new file mode 100644 index 00000000..e4958fdb --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt @@ -0,0 +1,81 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.component + +import com.google.protobuf.ByteString +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import io.netty.buffer.PooledByteBufAllocator +import io.netty.buffer.Unpooled +import org.onap.ves.VesEventV5.VesEvent +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import java.nio.charset.Charset +import java.util.* + +val alocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT + +fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = alocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(1) // major version + writeByte(0) // minor version + + val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer() + writeInt(gpb.limit()) // ves event size in bytes + writeBytes(gpb) // ves event as GPB bytes +} + + +fun invalidVesMessage() = alocator.buffer().run { + writeByte(0xFF) // always 0xFF + writeByte(1) // major version + writeByte(0) // minor version + + val invalidGpb = "some random data".toByteArray(Charsets.UTF_8) + writeInt(invalidGpb.size) // ves event size in bytes + writeBytes(invalidGpb) + +} + +fun invalidWireFrame() = alocator.buffer().run { + writeByte(0xFF) + writeByte(1) + writeByte(0) +} + +fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = + VesEvent.newBuilder() + .setCommonEventHeader( + CommonEventHeader.getDefaultInstance().toBuilder() + .setVersion("1.0") + .setEventId(id) + .setDomain(domain) + .setEventName("Sample event name") + .setSourceName("Sample Source") + .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String")) + .setPriority(CommonEventHeader.Priority.MEDIUM) + .setStartEpochMicrosec(120034455) + .setLastEpochMicrosec(120034459) + .setSequence(1)) + .setHvRanMeasFields(ByteString.EMPTY) + .build() + + + diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt new file mode 100644 index 00000000..728e0c5c --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt @@ -0,0 +1,52 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.routing +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import reactor.core.publisher.FluxProcessor +import reactor.core.publisher.UnicastProcessor + + +const val HVRANMEAS_TOPIC = "ves_hvRanMeas" + +val basicConfiguration: CollectorConfiguration = CollectorConfiguration( + kafkaBootstrapServers = "localhost:9969", + routing = routing { + defineRoute { + fromDomain(Domain.HVRANMEAS) + toTopic(HVRANMEAS_TOPIC) + withFixedPartitioning() + } + }.build() +) + + +class FakeConfigurationProvider : ConfigurationProvider { + private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create() + + fun updateConfiguration(collectorConfiguration: CollectorConfiguration) { + configStream.onNext(collectorConfiguration) + } + + override fun invoke() = configStream +}
\ No newline at end of file diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt new file mode 100644 index 00000000..7da72151 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt @@ -0,0 +1,43 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import java.util.* +import java.util.concurrent.ConcurrentLinkedDeque + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +class FakeSink : Sink { + private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque() + + val sentMessages: List<RoutedMessage> + get() = sent.toList() + + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { + return messages.doOnNext(sent::addLast).map { it.message } + } +} |