aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-xnf-simulator
diff options
context:
space:
mode:
authorfkrzywka <filip.krzywka@nokia.com>2018-07-03 10:14:38 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 13:41:04 +0200
commitf8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch)
tree634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-xnf-simulator
parent1383775f3df00bd08a7ac14fe1278858bdef6487 (diff)
Enhance wire protocol
Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka <filip.krzywka@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-xnf-simulator')
-rw-r--r--hv-collector-xnf-simulator/sample-request.json20
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt8
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt43
5 files changed, 27 insertions, 52 deletions
diff --git a/hv-collector-xnf-simulator/sample-request.json b/hv-collector-xnf-simulator/sample-request.json
deleted file mode 100644
index ca8bd885..00000000
--- a/hv-collector-xnf-simulator/sample-request.json
+++ /dev/null
@@ -1,20 +0,0 @@
-{
- "commonEventHeader": {
- "version": "sample-version",
- "domain": 10,
- "sequence": 1,
- "priority": 1,
- "eventId": "sample-event-id",
- "eventName": "sample-event-name",
- "eventType": "sample-event-type",
- "startEpochMicrosec": 120034455,
- "lastEpochMicrosec": 120034455,
- "nfNamingCode": "sample-nf-naming-code",
- "nfcNamingCode": "sample-nfc-naming-code",
- "reportingEntityId": "sample-reporting-entity-id",
- "reportingEntityName": "sample-reporting-entity-name",
- "sourceId": "sample-source-id",
- "sourceName": "sample-source-name"
- },
- "messagesAmount": 25000
-}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
index f4c92fd4..a6d6af84 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt
@@ -19,7 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.api
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import reactor.core.publisher.Flux
@@ -28,5 +28,5 @@ import reactor.core.publisher.Flux
* @since June 2018
*/
interface MessageGenerator {
- fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame>
+ fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage>
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index b67bc644..6346b648 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import ratpack.exec.Promise
@@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) {
}
}
- private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+ private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> {
return ctx.request.body
.map { Json.createReader(it.inputStream).readObject() }
.map { extractMessageParameters(it) }
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
index 0d28bad0..baff967a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt
@@ -20,7 +20,7 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.ves.VesEventV5.VesEvent
@@ -35,7 +35,7 @@ import javax.json.JsonObject
*/
internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
- override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> =
+ override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> =
Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let {
if (messageParameters.amount < 0)
it.repeat()
@@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat
.build()
- private fun createMessage(commonHeader: CommonEventHeader): WireFrame =
- WireFrame(vesMessageBytes(commonHeader))
+ private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage =
+ PayloadWireFrameMessage(vesMessageBytes(commonHeader))
private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray =
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 6487888e..2f9e0b59 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -20,13 +20,13 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import arrow.effects.IO
-import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
+import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
}
.build()
- fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+ fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> {
sendRx(messages).block()
}
- fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+ fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
.newHandler { _, output -> handler(complete, messages, output) }
@@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
return complete.then()
}
- private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<PayloadWireFrameMessage>,
+ nettyOutbound: NettyOutbound):
Publisher<Void> {
- val encoder = WireFrameEncoder(nettyOutbound.alloc())
- val context = nettyOutbound.context()
-
- context.onClose {
- logger.info { "Connection to ${context.address()} has been closed" }
- }
-
- // TODO: Close channel after all messages have been sent
- // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
-// complete.subscribe {
-// context.channel().disconnect().addListener {
-// if (it.isSuccess)
-// logger.info { "Connection closed" }
-// else
-// logger.warn("Failed to close the connection", it.cause())
-// }
-// }
-
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
val frames = messages
.map(encoder::encode)
.window(MAX_BATCH_SIZE)
return nettyOutbound
+ .logConnectionClosed()
.options { it.flushOnBoundary() }
.sendGroups(frames)
- .send(Mono.just(Unpooled.EMPTY_BUFFER))
+ .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
.then {
logger.info("Messages have been sent")
complete.onComplete()
@@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) {
.clientAuth(ClientAuth.REQUIRE)
.build()
+ private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
+ context().onClose {
+ logger.info { "Connection to ${context().address()} has been closed" }
+ }
+ return this
+ }
+
companion object {
private const val MAX_BATCH_SIZE = 128
+ private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
private val logger = Logger(XnfSimulator::class)
}
}