diff options
author | fkrzywka <filip.krzywka@nokia.com> | 2018-07-03 10:14:38 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 13:41:04 +0200 |
commit | f8a9a10a75bf139203fe9ea48a01708c7bda0781 (patch) | |
tree | 634321d472c69d67f817cd2e689dc25c10af7c1a /hv-collector-xnf-simulator | |
parent | 1383775f3df00bd08a7ac14fe1278858bdef6487 (diff) |
Enhance wire protocol
Handle new wire frame message type which should allow clients to
indicate that all data has been sent to collector
Change xNF Simulator to send end-of-transmission message
after sending all messages
Close ves-hv-collector stream after encountering EOT message
Remove duplicated file in project
Closes ONAP-391
Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16
Signed-off-by: fkrzywka <filip.krzywka@nokia.com>
Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-xnf-simulator')
5 files changed, 27 insertions, 52 deletions
diff --git a/hv-collector-xnf-simulator/sample-request.json b/hv-collector-xnf-simulator/sample-request.json deleted file mode 100644 index ca8bd885..00000000 --- a/hv-collector-xnf-simulator/sample-request.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "commonEventHeader": { - "version": "sample-version", - "domain": 10, - "sequence": 1, - "priority": 1, - "eventId": "sample-event-id", - "eventName": "sample-event-name", - "eventType": "sample-event-type", - "startEpochMicrosec": 120034455, - "lastEpochMicrosec": 120034455, - "nfNamingCode": "sample-nf-naming-code", - "nfcNamingCode": "sample-nfc-naming-code", - "reportingEntityId": "sample-reporting-entity-id", - "reportingEntityName": "sample-reporting-entity-name", - "sourceId": "sample-source-id", - "sourceName": "sample-source-name" - }, - "messagesAmount": 25000 -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt index f4c92fd4..a6d6af84 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.api -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import reactor.core.publisher.Flux @@ -28,5 +28,5 @@ import reactor.core.publisher.Flux * @since June 2018 */ interface MessageGenerator { - fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> + fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index b67bc644..6346b648 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.dcae.collectors.veshv.utils.logging.Logger import ratpack.exec.Promise @@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) { } } - private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> { + private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> { return ctx.request.body .map { Json.createReader(it.inputStream).readObject() } .map { extractMessageParameters(it) } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt index 0d28bad0..baff967a 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.ves.VesEventV5.VesEvent @@ -35,7 +35,7 @@ import javax.json.JsonObject */ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> = + override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> = Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { if (messageParameters.amount < 0) it.repeat() @@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat .build() - private fun createMessage(commonHeader: CommonEventHeader): WireFrame = - WireFrame(vesMessageBytes(commonHeader)) + private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage = + PayloadWireFrameMessage(vesMessageBytes(commonHeader)) private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray = diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 6487888e..2f9e0b59 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -20,13 +20,13 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux<WireFrame>) = IO<Unit> { + fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> { sendRx(messages).block() } - fun sendRx(messages: Flux<WireFrame>): Mono<Void> { + fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { return complete.then() } - private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound): + private fun handler(complete: ReplayProcessor<Void>, + messages: Flux<PayloadWireFrameMessage>, + nettyOutbound: NettyOutbound): Publisher<Void> { - val encoder = WireFrameEncoder(nettyOutbound.alloc()) - val context = nettyOutbound.context() - - context.onClose { - logger.info { "Connection to ${context.address()} has been closed" } - } - - // TODO: Close channel after all messages have been sent - // The code bellow doesn't work because it closes the channel earlier and not all are consumed... -// complete.subscribe { -// context.channel().disconnect().addListener { -// if (it.isSuccess) -// logger.info { "Connection closed" } -// else -// logger.warn("Failed to close the connection", it.cause()) -// } -// } - + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) val frames = messages .map(encoder::encode) .window(MAX_BATCH_SIZE) return nettyOutbound + .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(Unpooled.EMPTY_BUFFER)) + .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { .clientAuth(ClientAuth.REQUIRE) .build() + private fun NettyOutbound.logConnectionClosed(): NettyOutbound { + context().onClose { + logger.info { "Connection to ${context().address()} has been closed" } + } + return this + } + companion object { private const val MAX_BATCH_SIZE = 128 + private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE private val logger = Logger(XnfSimulator::class) } } |