From 07bbbf71cd65b29f446a1b475add87f20365db83 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Thu, 7 Jun 2018 11:52:16 +0200 Subject: Fix TCP stream framing issue Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../xnf/config/ArgBasedClientConfiguration.kt | 9 +++++---- .../veshv/simulators/xnf/config/ClientConfiguration.kt | 2 +- .../veshv/simulators/xnf/impl/MessageFactory.kt | 18 ++++++++---------- .../veshv/simulators/xnf/impl/VesHvClient.kt | 5 +++-- .../onap/dcae/collectors/veshv/simulators/xnf/main.kt | 4 +++- .../veshv/simulators/xnf/impl/MessageFactoryTest.kt | 17 +++++++++-------- 6 files changed, 29 insertions(+), 26 deletions(-) (limited to 'hv-collector-client-simulator/src') diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt index 49653b57..b946689f 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt @@ -28,7 +28,7 @@ import java.io.File import java.nio.file.Paths internal object DefaultValues { - const val MESSAGES_AMOUNT = 1 + const val MESSAGES_AMOUNT = -1L const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" const val CERT_FILE = "/etc/ves-hv/client.crt" const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" @@ -98,7 +98,7 @@ internal object ArgBasedClientConfiguration { val cmdLine = parser.parse(options, args) val host = cmdLine.stringValue(OPT_VES_HOST) val port = cmdLine.intValue(OPT_VES_PORT) - val msgsAmount = cmdLine.intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT) + val msgsAmount = cmdLine.longValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT) return ClientConfiguration( host, port, @@ -121,8 +121,9 @@ internal object ArgBasedClientConfiguration { private fun stringPathToPath(path: String) = Paths.get(File(path).toURI()) - private fun CommandLine.intValueOrDefault(option: Option, default: Int) = - getOptionValue(option.opt)?.toInt() ?: default + + private fun CommandLine.longValueOrDefault(option: Option, default: Long) = + getOptionValue(option.opt)?.toLong() ?: default private fun CommandLine.intValue(option: Option) = getOptionValue(option.opt).toInt() diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt index e835ee95..83d6f7c0 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt @@ -27,4 +27,4 @@ data class ClientConfiguration( val vesHost: String, val vesPort: Int, val security: ClientSecurityConfiguration, - val messagesAmount: Int) + val messagesAmount: Long) diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt index 0c578b38..d5f7c7c8 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt @@ -38,8 +38,13 @@ class MessageFactory { const val DEFAULT_LAST_EPOCH: Long = 120034455 } - fun createMessageFlux(amount: Int = 1): Flux = - Mono.just(createMessage()).repeat(amount.toLong()) + fun createMessageFlux(amount: Long = -1): Flux = + Mono.fromCallable(this::createMessage).let { + if (amount < 0) + it.repeat() + else + it.repeat(amount) + } private fun createMessage(): WireFrame { @@ -57,14 +62,7 @@ class MessageFactory { .build() val payload = vesMessageBytes(commonHeader) - return WireFrame( - payload = payload, - mark = 0xFF, - majorVersion = 1, - minorVersion = 2, - payloadSize = payload.readableBytes()) - - + return WireFrame(payload) } private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf { diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index c911c533..29573e86 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder @@ -29,6 +30,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfig import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.tcp.TcpClient @@ -53,7 +55,6 @@ class VesHvClient(configuration: ClientConfiguration) { client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) }) } - // sending flux with multiple WireFrames not supported yet private fun handler(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound, messages: Flux): Publisher { @@ -64,8 +65,8 @@ class VesHvClient(configuration: ClientConfiguration) { .subscribe { str -> logger.info("Server response: $str") } val frames = messages - .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } } .map { it.encode(nettyOutbound.alloc()) } + .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) return nettyOutbound .options { it.flushOnEach() } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index ee7f49a6..68f999ef 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -36,12 +36,14 @@ fun main(args: Array) { val clientConfig = ArgBasedClientConfiguration.parse(args) val messageFactory = MessageFactory() val client = VesHvClient(clientConfig) - client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) + client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) } catch (e: ArgBasedClientConfiguration.WrongArgumentException) { e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") + System.exit(1) } catch (e: Exception) { logger.error(e.localizedMessage) logger.debug("An error occurred when starting ves client", e) + System.exit(2) } } diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt index edcec65f..405a15eb 100644 --- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt +++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt @@ -23,8 +23,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory -import kotlin.test.assertEquals +import reactor.test.test /** * @author Jakub Dudycz @@ -35,16 +34,18 @@ object MessageFactoryTest : Spek({ val factory = MessageFactory() given("no parameters") { - it("should return flux with one message") { - val result = factory.createMessageFlux() - - assertEquals(1, result.count().block()) + it("should return infinite flux") { + val limit = 1000L + factory.createMessageFlux().take(limit).test() + .expectNextCount(limit) + .verifyComplete() } } given("messages amount") { it("should return message flux of specified size") { - val result = factory.createMessageFlux(5) - assertEquals(5, result.count().block()) + factory.createMessageFlux(5).test() + .expectNextCount(5) + .verifyComplete() } } } -- cgit 1.2.3-korg