aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-xnf-simulator/src
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-xnf-simulator/src')
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt8
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt43
4 files changed, 27 insertions, 32 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
index f4c92fd4..a6d6af84 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.api
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import reactor.core.publisher.Flux
@@ -28,5 +28,5 @@ import reactor.core.publisher.Flux
* @since June 2018
*/
interface MessageGenerator {
- fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame>
+ fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage>
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index b67bc644..6346b648 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import ratpack.exec.Promise
@@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
}
}
- private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+ private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
return ctx.request.body
.map { Json.createReader(it.inputStream).readObject() }
.map { extractMessageParameters(it) }
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
index 0d28bad0..baff967a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.ves.VesEventV5.VesEvent
@@ -35,7 +35,7 @@ import javax.json.JsonObject
*/
internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
- override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+ override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> =
Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
if (messageParameters.amount < 0)
it.repeat()
@@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat
.build()
- private fun createMessage(commonHeader: CommonEventHeader): WireFrame =
- WireFrame(vesMessageBytes(commonHeader))
+ private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage =
+ PayloadWireFrameMessage(vesMessageBytes(commonHeader))
private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 6487888e..2f9e0b59 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -20,13 +20,13 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
}
.build()
- fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+ fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
sendRx(messages).block()
}
- fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+ fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
.newHandler { _, output -> handler(complete, messages, output) }
@@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
return complete.then()
}
- private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<PayloadWireFrameMessage>,
+ nettyOutbound: NettyOutbound):
Publisher<Void> {
- val encoder = WireFrameEncoder(nettyOutbound.alloc())
- val context = nettyOutbound.context()
-
- context.onClose {
- logger.info { "Connection to ${context.address()} has been closed" }
- }
-
- // TODO: Close channel after all messages have been sent
- // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
-// complete.subscribe {
-// context.channel().disconnect().addListener {
-// if (it.isSuccess)
-// logger.info { "Connection closed" }
-// else
-// logger.warn("Failed to close the connection", it.cause())
-// }
-// }
-
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
val frames = messages
.map(encoder::encode)
.window(MAX_BATCH_SIZE)
return nettyOutbound
+ .logConnectionClosed()
.options { it.flushOnBoundary() }
.sendGroups(frames)
- .send(Mono.just(Unpooled.EMPTY_BUFFER))
+ .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
.then {
logger.info("Messages have been sent")
complete.onComplete()
@@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
.clientAuth(ClientAuth.REQUIRE)
.build()
+ private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+ context().onClose {
+ logger.info { "Connection to ${context().address()} has been closed" }
+ }
+ return this
+ }
+
companion object {
private const val MAX_BATCH_SIZE = 128
+ private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
private val logger = Logger(XnfSimulator::class)
}
}