aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt (renamed from hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt)19
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt2
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt30
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt6
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt (renamed from hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt)21
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt4
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt3
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt3
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt17
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt35
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt31
-rw-r--r--hv-collector-ct/pom.xml9
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt193
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt30
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt45
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt21
-rw-r--r--hv-collector-ct/src/test/resources/logback-test.xml4
-rw-r--r--req.json20
20 files changed, 401 insertions, 98 deletions
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
index 9ede62a3..f4c92fd4 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/spek_extensions.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
@@ -17,15 +17,16 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.tests.component
+package org.onap.dcae.collectors.veshv.simulators.xnf.api
-import org.jetbrains.spek.api.dsl.Pending
-import org.jetbrains.spek.api.dsl.TestContainer
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import reactor.core.publisher.Flux
-internal fun TestContainer.system(description: String, body: (Sut) -> Unit) {
- test("system $description", body = { body(Sut()) })
-}
-
-internal fun TestContainer.xsystem(description: String, reason: String? = null, body: (Sut) -> Unit = {}) {
- test("system $description", Pending.Yes(reason), body = { body(Sut()) })
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+interface MessageGenerator {
+ fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame>
}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
index ed96e6c3..657ed317 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
@@ -25,7 +25,7 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-data class ClientConfiguration(
+internal data class ClientConfiguration(
val vesHost: String,
val vesPort: Int,
val security: SecurityConfiguration,
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt
new file mode 100644
index 00000000..dce386b1
--- /dev/null
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/factory.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageGeneratorImpl
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.PayloadGenerator
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+fun createMessageGenerator(): MessageGenerator = MessageGeneratorImpl(PayloadGenerator())
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index 3f872b51..c545ac8d 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -37,7 +37,7 @@ import javax.json.JsonObject
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class HttpServer(private val vesClient: VesHvClient) {
+internal class HttpServer(private val vesClient: VesHvClient) {
fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
RatpackServer.start { server ->
@@ -69,7 +69,7 @@ class HttpServer(private val vesClient: VesHvClient) {
return ctx.request.body
.map { Json.createReader(it.inputStream).readObject() }
.map { extractMessageParameters(it) }
- .map { MessageFactory.INSTANCE.createMessageFlux(it) }
+ .map { MessageGeneratorImpl.INSTANCE.createMessageFlux(it) }
}
private fun sendAcceptedResponse(ctx: Context) {
@@ -95,7 +95,7 @@ class HttpServer(private val vesClient: VesHvClient) {
private fun extractMessageParameters(request: JsonObject): MessageParameters =
try {
- val commonEventHeader = MessageFactory.INSTANCE
+ val commonEventHeader = MessageGeneratorImpl.INSTANCE
.parseCommonHeader(request.getJsonObject("commonEventHeader"))
val messagesAmount = request.getJsonNumber("messagesAmount").longValue()
MessageParameters(commonEventHeader, messagesAmount)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
index f731e11c..0d28bad0 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -32,9 +33,9 @@ import javax.json.JsonObject
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class MessageFactory(private val payloadGenerator: PayloadGenerator) {
+internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
- fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+ override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
if (messageParameters.amount < 0)
it.repeat()
@@ -65,16 +66,14 @@ class MessageFactory(private val payloadGenerator: PayloadGenerator) {
WireFrame(vesMessageBytes(commonHeader))
- private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray {
- val msg = VesEvent.newBuilder()
- .setCommonEventHeader(commonHeader)
- .setHvRanMeasFields(PayloadGenerator().generatePayload().toByteString())
- .build()
-
- return msg.toByteArray()
- }
+ private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
+ VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(payloadGenerator.generatePayload().toByteString())
+ .build()
+ .toByteArray()
companion object {
- val INSTANCE = MessageFactory(PayloadGenerator())
+ val INSTANCE = MessageGeneratorImpl(PayloadGenerator())
}
}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt
index 17dbbf41..c8b97639 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/PayloadGenerator.kt
@@ -22,9 +22,9 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject
import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields.HVRanMeasPayload.PMObject.HVRanMeas
-import java.util.Random
+import java.util.*
-class PayloadGenerator {
+internal class PayloadGenerator {
private val randomGenerator = Random()
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index be351b50..43b73e1f 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -31,7 +31,6 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
-import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.ReplayProcessor
@@ -43,7 +42,7 @@ import reactor.ipc.netty.tcp.TcpClient
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class VesHvClient(private val configuration: ClientConfiguration) {
+internal class VesHvClient(private val configuration: ClientConfiguration) {
private val client: TcpClient = TcpClient.builder()
.options { opts ->
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index f2229507..dbeba2b2 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -21,11 +21,8 @@ package org.onap.dcae.collectors.veshv.simulators.xnf
import arrow.core.Failure
import arrow.core.Success
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration
-import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient
import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
import org.onap.dcae.collectors.veshv.utils.logging.Logger
diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
index 2f592641..6f8a95a4 100644
--- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
+++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
@@ -40,7 +40,7 @@ const val SAMPLE_LAST_EPOCH: Long = 120034455
object MessageFactoryTest : Spek({
describe("message factory") {
- val factory = MessageFactory.INSTANCE
+ val factory = MessageGeneratorImpl.INSTANCE
given("only common header") {
it("should return infinite flux") {
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 033095ad..3246cf59 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -59,21 +59,28 @@ internal class VesHvCollector(
.compose(sink::send)
.doOnNext { metrics.notifyMessageSent(it.topic) }
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .onErrorResume(this::handleErrors)
.then()
}
private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
- wireChunkDecoder.release()
- }
-
private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
mapper(input).fold(
{ Mono.empty() },
{ Mono.just(it) })
+ private fun handleErrors(ex: Throwable): Flux<RoutedMessage> {
+ logger.warn("Error while handling message stream: ${ex::class.qualifiedName} (${ex.message})")
+ logger.debug("Detailed stack trace", ex)
+ return Flux.empty()
+ }
+
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
+ wireChunkDecoder.release()
+ }
+
companion object {
- val logger = Logger(VesHvCollector::class)
+ private val logger = Logger(VesHvCollector::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index 056e0557..cfb61b3e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -31,9 +31,40 @@ import reactor.core.publisher.Flux
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class WireChunkDecoder(private val decoder: WireFrameDecoder,
- alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+internal class WireChunkDecoder(
+ private val decoder: WireFrameDecoder,
+ alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
private val streamBuffer = alloc.compositeBuffer()
+
+// TODO: use this implementation and cleanup the rest
+// fun decode(byteBuf: ByteBuf): Flux<WireFrame> = Flux.defer<WireFrame> {
+// if (byteBuf.readableBytes() == 0) {
+// byteBuf.release()
+// Flux.empty()
+// } else {
+// streamBuffer.addComponent(true, byteBuf)
+// Flux.generate { next ->
+// try {
+// val frame = decodeFirstFrameFromBuffer()
+// if (frame == null)
+// next.complete()
+// else
+// next.next(frame)
+// } catch (ex: Exception) {
+// next.error(ex)
+// }
+// }
+// }
+// }.doOnTerminate { streamBuffer.discardReadComponents() }
+//
+//
+// private fun decodeFirstFrameFromBuffer(): WireFrame? =
+// try {
+// decoder.decodeFirst(streamBuffer)
+// } catch (ex: MissingWireFrameBytesException) {
+// logger.trace { "${ex.message} - waiting for more data" }
+// null
+// }
fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter
.createFlux(decoder, streamBuffer, byteBuf)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
index abebff3d..540c647a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -35,24 +35,27 @@ internal class WireFrameSink(
private val streamBuffer: ByteBuf,
private val sink: FluxSink<WireFrame>,
private val requestedFrameCount: Long) {
+ private var completed = false
fun handleSubscriber() {
- logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+ if (!completed) {
+ logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
- try {
- if (requestedFrameCount == Long.MAX_VALUE) {
- logger.trace { "Push based strategy" }
- pushAvailableFrames()
- } else {
- logger.trace { "Pull based strategy - req $requestedFrameCount" }
- pushUpToNumberOfFrames()
+ try {
+ if (requestedFrameCount == Long.MAX_VALUE) {
+ logger.trace { "Push based strategy" }
+ pushAvailableFrames()
+ } else {
+ logger.trace { "Pull based strategy - req $requestedFrameCount" }
+ pushUpToNumberOfFrames()
+ }
+ } catch (ex: Exception) {
+ completed = true
+ sink.error(ex)
}
- } catch (ex: Exception) {
- sink.error(ex)
- }
-
- logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+ }
}
private fun pushAvailableFrames() {
@@ -61,6 +64,7 @@ internal class WireFrameSink(
sink.next(nextFrame)
nextFrame = decodeFirstFrameFromBuffer()
}
+ completed = true
sink.complete()
}
@@ -76,6 +80,7 @@ internal class WireFrameSink(
}
}
if (remaining > 0 && nextFrame == null) {
+ completed = true
sink.complete()
}
}
diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml
index 1db0345c..63a5c093 100644
--- a/hv-collector-ct/pom.xml
+++ b/hv-collector-ct/pom.xml
@@ -64,8 +64,17 @@
<artifactId>hv-collector-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-client-simulator</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-syntax</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
new file mode 100644
index 00000000..c68f0514
--- /dev/null
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -0,0 +1,193 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.tests.component
+
+import arrow.syntax.function.partially1
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import io.netty.buffer.CompositeByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
+import org.onap.dcae.collectors.veshv.simulators.xnf.createMessageGenerator
+import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
+import reactor.core.publisher.Flux
+import reactor.math.sum
+import java.security.MessageDigest
+import java.time.Duration
+import java.util.*
+import kotlin.system.measureTimeMillis
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object PerformanceSpecification : Spek({
+ describe("VES High Volume Collector performance") {
+ it("should handle multiple clients in reasonable time") {
+ val sink = CountingSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val numMessages: Long = 300_000
+ val runs = 4
+ val timeout = Duration.ofMinutes((1 + (runs / 2)).toLong())
+
+ val params = MessageParameters(
+ commonEventHeader = vesEvent().commonEventHeader,
+ amount = numMessages)
+ val fluxes = (1.rangeTo(runs)).map {
+ sut.collector.handleConnection(sut.alloc, generateDataStream(sut.alloc, params))
+ }
+ val durationMs = measureTimeMillis {
+ Flux.merge(fluxes).then().block(timeout)
+ }
+
+ val durationSec = durationMs / 1000.0
+ val throughput = sink.count / durationSec
+ println("Processed $runs connections each containing $numMessages msgs.")
+ println("Forwarded ${sink.count / ONE_MILION} Mmsgs in $durationSec seconds, that is $throughput msgs/s")
+ assertThat(sink.count)
+ .describedAs("should send all events")
+ .isEqualTo(runs * numMessages)
+ }
+
+ it("should disconnect on transmission errors") {
+ val sink = CountingSink()
+ val sut = Sut(sink)
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+
+ val numMessages: Long = 100_000
+ val timeout = Duration.ofSeconds(30)
+
+ val params = MessageParameters(
+ commonEventHeader = vesEvent().commonEventHeader,
+ amount = numMessages)
+
+ val dataStream = generateDataStream(sut.alloc, params)
+ .transform(::dropWhenIndex.partially1 { it % 101 == 0L })
+ sut.collector.handleConnection(sut.alloc, dataStream)
+ .timeout(timeout)
+ .block()
+
+ println("Forwarded ${sink.count} msgs")
+ assertThat(sink.count)
+ .describedAs("should send up to number of events")
+ .isLessThan(numMessages)
+ }
+ }
+
+ describe("test infrastructure") {
+ val digest = MessageDigest.getInstance("MD5")
+
+ fun collectDigest(bb: ByteBuf) {
+ bb.markReaderIndex()
+ while (bb.isReadable) {
+ digest.update(bb.readByte())
+ }
+ bb.resetReaderIndex()
+ }
+
+ fun calculateDigest(arrays: List<ByteArray>): ByteArray {
+ for (array in arrays) {
+ digest.update(array)
+ }
+ return digest.digest()
+ }
+
+ it("should yield same bytes as in the input") {
+ val numberOfBuffers = 10
+ val singleBufferSize = 1000
+ val arrays = (1.rangeTo(numberOfBuffers)).map { randomByteArray(singleBufferSize) }
+ val inputDigest = calculateDigest(arrays)
+
+ val actualTotalSize = Flux.fromIterable(arrays)
+ .map { Unpooled.wrappedBuffer(it) }
+ .transform { simulateRemoteTcp(UnpooledByteBufAllocator.DEFAULT, 4, it) }
+ .doOnNext(::collectDigest)
+ .map {
+ val size = it.readableBytes()
+ it.release()
+ size
+ }
+ .sum()
+ .map(Long::toInt)
+ .block()
+
+ val outputDigest = digest.digest()
+
+ assertThat(actualTotalSize).isEqualTo(numberOfBuffers * singleBufferSize)
+ assertThat(outputDigest).isEqualTo(inputDigest)
+
+ }
+ }
+})
+
+
+private const val ONE_MILION = 1_000_000.0
+
+private val rand = Random()
+private fun randomByteArray(size: Int): ByteArray {
+ val bytes = ByteArray(size)
+ rand.nextBytes(bytes)
+ return bytes
+}
+
+fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<ByteBuf> =
+ stream.index()
+ .filter { predicate(it.t1) }
+ .map { it.t2 }
+
+private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
+ WireFrameEncoder(alloc).let { encoder ->
+ createMessageGenerator()
+ .createMessageFlux(params)
+ .map(encoder::encode)
+ .transform { simulateRemoteTcp(alloc, 1000, it) }
+ }
+
+private fun simulateRemoteTcp(alloc: ByteBufAllocator, maxSize: Int, byteBuffers: Flux<ByteBuf>) =
+ byteBuffers
+ .bufferTimeout(maxSize, Duration.ofMillis(250))
+ .map { joinBuffers(alloc, it) }
+ .concatMap { randomlySplitTcpFrames(it) }
+
+private fun joinBuffers(alloc: ByteBufAllocator, it: List<ByteBuf>?) =
+ alloc.compositeBuffer().addComponents(true, it)
+
+private fun randomlySplitTcpFrames(bb: CompositeByteBuf): Flux<ByteBuf> {
+ val targetFrameSize = Math.max(4, (bb.readableBytes() * Math.random()).toInt())
+ return Flux.create<ByteBuf> { sink ->
+ while (bb.isReadable) {
+ val frameSize = Math.min(targetFrameSize, bb.readableBytes())
+ sink.next(bb.retainedSlice(bb.readerIndex(), frameSize))
+ bb.readerIndex(bb.readerIndex() + frameSize)
+ }
+ bb.release()
+ sink.complete()
+ }
+}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 5099ae4c..44b3266e 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -22,14 +22,14 @@ package org.onap.dcae.collectors.veshv.tests.component
import io.netty.buffer.ByteBuf
import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.Exceptions
import reactor.core.publisher.Flux
import java.time.Duration
@@ -37,9 +37,9 @@ import java.time.Duration
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class Sut {
+class Sut(sink: Sink = StoringSink()) {
val configurationProvider = FakeConfigurationProvider()
- val sink = FakeSink()
+
val alloc = UnpooledByteBufAllocator.DEFAULT
val metrics = FakeMetrics()
private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics)
@@ -47,21 +47,9 @@ internal class Sut {
val collector: Collector
get() = collectorProvider()
+}
- fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
- return sink.sentMessages
- }
-
- fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> =
- try {
- collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
- Pair(sink.sentMessages, null)
- } catch (ex: Exception) {
- Pair(sink.sentMessages, ex)
- }
-
- companion object {
- val logger = Logger(Sut::class)
- }
+fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
+ collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+ return sink.sentMessages
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 08b6382d..08450598 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -23,8 +23,10 @@ import com.google.protobuf.InvalidProtocolBufferException
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -34,9 +36,11 @@ import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
*/
object VesHvSpecification : Spek({
describe("VES High Volume Collector") {
- system("should handle multiple HV RAN events") { sut ->
+ it("should handle multiple HV RAN events") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS), vesMessage(Domain.HVRANMEAS))
assertThat(messages)
.describedAs("should send all events")
@@ -46,18 +50,18 @@ object VesHvSpecification : Spek({
describe("Memory management") {
- system("should release memory for each handled and dropped message") { sut ->
+ it("should release memory for each handled and dropped message") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
sut.configurationProvider.updateConfiguration(basicConfiguration)
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
val msgWithInvalidFrame = invalidWireFrame()
val expectedRefCnt = 0
- val (handledEvents, exception) = sut.handleConnectionReturningError(
- validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
+ val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
assertThat(handledEvents).hasSize(1)
- assertThat(exception).isNull()
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
@@ -71,17 +75,17 @@ object VesHvSpecification : Spek({
}
- system("should release memory for each message with invalid payload") { sut ->
+ it("should release memory for each message with invalid payload") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
sut.configurationProvider.updateConfiguration(basicConfiguration)
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidPayload = invalidVesMessage()
val expectedRefCnt = 0
- val (handledEvents, exception) = sut.handleConnectionReturningError(
- validMessage, msgWithInvalidPayload)
+ val handledEvents = sut.handleConnection(sink, validMessage, msgWithInvalidPayload)
assertThat(handledEvents).hasSize(1)
- assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java)
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
@@ -92,18 +96,17 @@ object VesHvSpecification : Spek({
}
- system("should release memory for each message with garbage frame") { sut ->
+ it("should release memory for each message with garbage frame") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
sut.configurationProvider.updateConfiguration(basicConfiguration)
val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithGarbageFrame = garbageFrame()
val expectedRefCnt = 0
- val (handledEvents, exception) = sut.handleConnectionReturningError(
- validMessage, msgWithGarbageFrame)
+ val handledEvents = sut.handleConnection(sink, validMessage, msgWithGarbageFrame)
assertThat(handledEvents).hasSize(1)
- assertThat(exception?.cause)
- .isInstanceOf(InvalidWireFrameMarkerException::class.java)
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
@@ -116,10 +119,12 @@ object VesHvSpecification : Spek({
}
describe("message routing") {
- system("should direct message to a topic by means of routing configuration") { sut ->
+ it("should direct message to a topic by means of routing configuration") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messages = sut.handleConnection(vesMessage(Domain.HVRANMEAS))
+ val messages = sut.handleConnection(sink, vesMessage(Domain.HVRANMEAS))
assertThat(messages).describedAs("number of routed messages").hasSize(1)
val msg = messages[0]
@@ -127,9 +132,11 @@ object VesHvSpecification : Spek({
assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
- system("should drop message if route was not found") { sut ->
+ it("should drop message if route was not found") {
+ val sink = StoringSink()
+ val sut = Sut(sink)
sut.configurationProvider.updateConfiguration(basicConfiguration)
- val messages = sut.handleConnection(
+ val messages = sut.handleConnection(sink,
vesMessage(Domain.OTHER, "first"),
vesMessage(Domain.HVRANMEAS, "second"),
vesMessage(Domain.HEARTBEAT, "third"))
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
index 3314c44f..8895d642 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
@@ -25,7 +25,7 @@ import io.netty.buffer.PooledByteBufAllocator
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import java.util.UUID
+import java.util.*
val allocator: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
@@ -61,7 +61,7 @@ fun invalidWireFrame() = allocator.buffer().run {
writeByte(0x01) // content type = GPB
}
-fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toString()) =
+fun vesEvent(domain: Domain = Domain.HVRANMEAS, id: String = UUID.randomUUID().toString()) =
VesEvent.newBuilder()
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index b0dbd0f5..a5fd546a 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -21,16 +21,16 @@ package org.onap.dcae.collectors.veshv.tests.fakes
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
import reactor.core.publisher.Flux
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class FakeSink : Sink {
+class StoringSink : Sink {
private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
val sentMessages: List<RoutedMessage>
@@ -40,3 +40,20 @@ class FakeSink : Sink {
return messages.doOnNext(sent::addLast)
}
}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class CountingSink : Sink {
+ private val atomicCount = AtomicLong(0)
+
+ val count: Long
+ get() = atomicCount.get()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ return messages.doOnNext {
+ atomicCount.incrementAndGet()
+ }
+ }
+}
diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml
index 84abc9d3..93f22771 100644
--- a/hv-collector-ct/src/test/resources/logback-test.xml
+++ b/hv-collector-ct/src/test/resources/logback-test.xml
@@ -26,10 +26,10 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="ROLLING-FILE"/>
</root>
-</configuration> \ No newline at end of file
+</configuration>
diff --git a/req.json b/req.json
new file mode 100644
index 00000000..e092ed66
--- /dev/null
+++ b/req.json
@@ -0,0 +1,20 @@
+{
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": 10,
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name"
+ },
+ "messagesAmount": 1000000
+}