aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-client-simulator/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-07 11:52:16 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 13:06:43 +0200
commit07bbbf71cd65b29f446a1b475add87f20365db83 (patch)
treee64fcf12c21e46358043744476d68765634d7f6f /hv-collector-client-simulator/src
parent767d0464a19e0949d2919e6df15c9653dec50503 (diff)
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-client-simulator/src')
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt9
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt2
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt18
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt5
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt4
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt17
6 files changed, 29 insertions, 26 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
index 49653b57..b946689f 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
@@ -28,7 +28,7 @@ import java.io.File
import java.nio.file.Paths
internal object DefaultValues {
- const val MESSAGES_AMOUNT = 1
+ const val MESSAGES_AMOUNT = -1L
const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
const val CERT_FILE = "/etc/ves-hv/client.crt"
const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
@@ -98,7 +98,7 @@ internal object ArgBasedClientConfiguration {
val cmdLine = parser.parse(options, args)
val host = cmdLine.stringValue(OPT_VES_HOST)
val port = cmdLine.intValue(OPT_VES_PORT)
- val msgsAmount = cmdLine.intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
+ val msgsAmount = cmdLine.longValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
return ClientConfiguration(
host,
port,
@@ -121,8 +121,9 @@ internal object ArgBasedClientConfiguration {
private fun stringPathToPath(path: String) = Paths.get(File(path).toURI())
- private fun CommandLine.intValueOrDefault(option: Option, default: Int) =
- getOptionValue(option.opt)?.toInt() ?: default
+
+ private fun CommandLine.longValueOrDefault(option: Option, default: Long) =
+ getOptionValue(option.opt)?.toLong() ?: default
private fun CommandLine.intValue(option: Option) =
getOptionValue(option.opt).toInt()
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
index e835ee95..83d6f7c0 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
@@ -27,4 +27,4 @@ data class ClientConfiguration(
val vesHost: String,
val vesPort: Int,
val security: ClientSecurityConfiguration,
- val messagesAmount: Int)
+ val messagesAmount: Long)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
index 0c578b38..d5f7c7c8 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
@@ -38,8 +38,13 @@ class MessageFactory {
const val DEFAULT_LAST_EPOCH: Long = 120034455
}
- fun createMessageFlux(amount: Int = 1): Flux<WireFrame> =
- Mono.just(createMessage()).repeat(amount.toLong())
+ fun createMessageFlux(amount: Long = -1): Flux<WireFrame> =
+ Mono.fromCallable(this::createMessage).let {
+ if (amount < 0)
+ it.repeat()
+ else
+ it.repeat(amount)
+ }
private fun createMessage(): WireFrame {
@@ -57,14 +62,7 @@ class MessageFactory {
.build()
val payload = vesMessageBytes(commonHeader)
- return WireFrame(
- payload = payload,
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 2,
- payloadSize = payload.readableBytes())
-
-
+ return WireFrame(payload)
}
private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index c911c533..29573e86 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
@@ -29,6 +30,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfig
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.tcp.TcpClient
@@ -53,7 +55,6 @@ class VesHvClient(configuration: ClientConfiguration) {
client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
}
- // sending flux with multiple WireFrames not supported yet
private fun handler(nettyInbound: NettyInbound,
nettyOutbound: NettyOutbound,
messages: Flux<WireFrame>): Publisher<Void> {
@@ -64,8 +65,8 @@ class VesHvClient(configuration: ClientConfiguration) {
.subscribe { str -> logger.info("Server response: $str") }
val frames = messages
- .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } }
.map { it.encode(nettyOutbound.alloc()) }
+ .concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
return nettyOutbound
.options { it.flushOnEach() }
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index ee7f49a6..68f999ef 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -36,12 +36,14 @@ fun main(args: Array<String>) {
val clientConfig = ArgBasedClientConfiguration.parse(args)
val messageFactory = MessageFactory()
val client = VesHvClient(clientConfig)
- client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+ client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
} catch (e: ArgBasedClientConfiguration.WrongArgumentException) {
e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
+ System.exit(1)
} catch (e: Exception) {
logger.error(e.localizedMessage)
logger.debug("An error occurred when starting ves client", e)
+ System.exit(2)
}
}
diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
index edcec65f..405a15eb 100644
--- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
+++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
@@ -23,8 +23,7 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
-import kotlin.test.assertEquals
+import reactor.test.test
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -35,16 +34,18 @@ object MessageFactoryTest : Spek({
val factory = MessageFactory()
given("no parameters") {
- it("should return flux with one message") {
- val result = factory.createMessageFlux()
-
- assertEquals(1, result.count().block())
+ it("should return infinite flux") {
+ val limit = 1000L
+ factory.createMessageFlux().take(limit).test()
+ .expectNextCount(limit)
+ .verifyComplete()
}
}
given("messages amount") {
it("should return message flux of specified size") {
- val result = factory.createMessageFlux(5)
- assertEquals(5, result.count().block())
+ factory.createMessageFlux(5).test()
+ .expectNextCount(5)
+ .verifyComplete()
}
}
}