aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-ct/src/test/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-ct/src/test/kotlin')
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt50
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt94
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt31
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt81
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt52
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt43
6 files changed, 351 insertions, 0 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
new file mode 100644
index 00000000..f8eefa5a
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
+import reactor.core.publisher.Flux
+import java.time.Duration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class Sut {
+ val configurationProvider = FakeConfigurationProvider()
+ val sink = FakeSink()
+ private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink))
+ val collectorProvider = collectorFactory.createVesHvCollectorProvider()
+
+ val collector: Collector
+ get() = collectorProvider()
+
+ fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> {
+ collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+
+ return sink.sentMessages
+ }
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
new file mode 100644
index 00000000..2cfb785e
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -0,0 +1,94 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object VesHvSpecification : Spek({
+ describe("VES High Volume Collector") {
+ system("should handle multiple HV RAN events") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
+
+ assertThat(messages)
+ .describedAs("should send all events")
+ .hasSize(2)
+ }
+
+ system("should release memory for each incoming message") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val msgWithInvalidDomain = vesMessage(Domain.OTHER)
+ val msgWithInvalidPayload = invalidVesMessage()
+ val msgWithInvalidFrame = invalidWireFrame()
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+
+ sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
+
+ assertThat(msgWithInvalidDomain.refCnt())
+ .describedAs("message with invalid domain should be released")
+ .isEqualTo(0)
+ assertThat(msgWithInvalidPayload.refCnt())
+ .describedAs("message with invalid payload should be released")
+ .isEqualTo(0)
+ assertThat(msgWithInvalidFrame.refCnt())
+ .describedAs("message with invalid frame should be released")
+ .isEqualTo(0)
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(0)
+ }
+ }
+
+ describe("message routing") {
+ system("should direct message to a topic by means of routing configuration") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS))
+ assertThat(messages).describedAs("number of routed messages").hasSize(1)
+
+ val msg = messages[0]
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1)
+ }
+
+ system("should drop message if route was not found") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val messages = sut.handleConnection(
+ vesMessage(Domain.OTHER, "first"),
+ vesMessage(Domain.HVRANMEAS, "second"),
+ vesMessage(Domain.HEARTBEAT, "third"))
+
+ assertThat(messages).describedAs("number of routed messages").hasSize(1)
+
+ val msg = messages[0]
+ assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
+ assertThat(msg.message.header.eventId).describedAs("routed message eventId").isEqualTo("second")
+ }
+ }
+})
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt
new file mode 100644
index 00000000..c30ba7e1
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import org.jetbrains.spek.api.dsl.Pending
+import org.jetbrains.spek.api.dsl.TestContainer
+
+internal fun TestContainer.system(description: String, body: (Sut) -> Unit) {
+ test("system $description", body = { body(Sut()) })
+}
+
+internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) {
+ test("system $description", Pending.Yes(reason), body = { body(Sut()) })
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
new file mode 100644
index 00000000..e4958fdb
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
@@ -0,0 +1,81 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.PooledByteBufAllocator
+import io.netty.buffer.Unpooled
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import java.nio.charset.Charset
+import java.util.*
+
+val alocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
+
+fun vesMessage(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) = alocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(1) // major version
+ writeByte(0) // minor version
+
+ val gpb = vesEvent(domain, id).toByteString().asReadOnlyByteBuffer()
+ writeInt(gpb.limit()) // ves event size in bytes
+ writeBytes(gpb) // ves event as GPB bytes
+}
+
+
+fun invalidVesMessage() = alocator.buffer().run {
+ writeByte(0xFF) // always 0xFF
+ writeByte(1) // major version
+ writeByte(0) // minor version
+
+ val invalidGpb = "some random data".toByteArray(Charsets.UTF_8)
+ writeInt(invalidGpb.size) // ves event size in bytes
+ writeBytes(invalidGpb)
+
+}
+
+fun invalidWireFrame() = alocator.buffer().run {
+ writeByte(0xFF)
+ writeByte(1)
+ writeByte(0)
+}
+
+fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) =
+ VesEvent.newBuilder()
+ .setCommonEventHeader(
+ CommonEventHeader.getDefaultInstance().toBuilder()
+ .setVersion("1.0")
+ .setEventId(id)
+ .setDomain(domain)
+ .setEventName("Sample event name")
+ .setSourceName("Sample Source")
+ .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
+ .setPriority(CommonEventHeader.Priority.MEDIUM)
+ .setStartEpochMicrosec(120034455)
+ .setLastEpochMicrosec(120034459)
+ .setSequence(1))
+ .setHvRanMeasFields(ByteString.EMPTY)
+ .build()
+
+
+
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
new file mode 100644
index 00000000..728e0c5c
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/configuration.kt
@@ -0,0 +1,52 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.routing
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import reactor.core.publisher.FluxProcessor
+import reactor.core.publisher.UnicastProcessor
+
+
+const val HVRANMEAS_TOPIC = "ves_hvRanMeas"
+
+val basicConfiguration: CollectorConfiguration = CollectorConfiguration(
+ kafkaBootstrapServers = "localhost:9969",
+ routing = routing {
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic(HVRANMEAS_TOPIC)
+ withFixedPartitioning()
+ }
+ }.build()
+)
+
+
+class FakeConfigurationProvider : ConfigurationProvider {
+ private val configStream: FluxProcessor<CollectorConfiguration, CollectorConfiguration> = UnicastProcessor.create()
+
+ fun updateConfiguration(collectorConfiguration: CollectorConfiguration) {
+ configStream.onNext(collectorConfiguration)
+ }
+
+ override fun invoke() = configStream
+} \ No newline at end of file
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
new file mode 100644
index 00000000..7da72151
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -0,0 +1,43 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.fakes
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+import java.util.*
+import java.util.concurrent.ConcurrentLinkedDeque
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class FakeSink : Sink {
+ private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
+
+ val sentMessages: List<RoutedMessage>
+ get() = sent.toList()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
+ return messages.doOnNext(sent::addLast).map { it.message }
+ }
+}