From f8a9a10a75bf139203fe9ea48a01708c7bda0781 Mon Sep 17 00:00:00 2001 From: fkrzywka Date: Tue, 3 Jul 2018 10:14:38 +0200 Subject: Enhance wire protocol Handle new wire frame message type which should allow clients to indicate that all data has been sent to collector Change xNF Simulator to send end-of-transmission message after sending all messages Close ves-hv-collector stream after encountering EOT message Remove duplicated file in project Closes ONAP-391 Change-Id: Idb6afc41d4bb0220a29df10c2aecfd76acd3ad16 Signed-off-by: fkrzywka Issue-ID: DCAEGEN2-601 --- hv-collector-xnf-simulator/sample-request.json | 20 ---------- .../veshv/simulators/xnf/api/MessageGenerator.kt | 4 +- .../veshv/simulators/xnf/impl/HttpServer.kt | 4 +- .../simulators/xnf/impl/MessageGeneratorImpl.kt | 8 ++-- .../veshv/simulators/xnf/impl/XnfSimulator.kt | 43 ++++++++++------------ 5 files changed, 27 insertions(+), 52 deletions(-) delete mode 100644 hv-collector-xnf-simulator/sample-request.json (limited to 'hv-collector-xnf-simulator') diff --git a/hv-collector-xnf-simulator/sample-request.json b/hv-collector-xnf-simulator/sample-request.json deleted file mode 100644 index ca8bd885..00000000 --- a/hv-collector-xnf-simulator/sample-request.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "commonEventHeader": { - "version": "sample-version", - "domain": 10, - "sequence": 1, - "priority": 1, - "eventId": "sample-event-id", - "eventName": "sample-event-name", - "eventType": "sample-event-type", - "startEpochMicrosec": 120034455, - "lastEpochMicrosec": 120034455, - "nfNamingCode": "sample-nf-naming-code", - "nfcNamingCode": "sample-nfc-naming-code", - "reportingEntityId": "sample-reporting-entity-id", - "reportingEntityName": "sample-reporting-entity-name", - "sourceId": "sample-source-id", - "sourceName": "sample-source-name" - }, - "messagesAmount": 25000 -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt index f4c92fd4..a6d6af84 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.api -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import reactor.core.publisher.Flux @@ -28,5 +28,5 @@ import reactor.core.publisher.Flux * @since June 2018 */ interface MessageGenerator { - fun createMessageFlux(messageParameters: MessageParameters): Flux + fun createMessageFlux(messageParameters: MessageParameters): Flux } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index b67bc644..6346b648 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.dcae.collectors.veshv.utils.logging.Logger import ratpack.exec.Promise @@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) { } } - private fun createMessageFlux(ctx: Context): Promise> { + private fun createMessageFlux(ctx: Context): Promise> { return ctx.request.body .map { Json.createReader(it.inputStream).readObject() } .map { extractMessageParameters(it) } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt index 0d28bad0..baff967a 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.ves.VesEventV5.VesEvent @@ -35,7 +35,7 @@ import javax.json.JsonObject */ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - override fun createMessageFlux(messageParameters: MessageParameters): Flux = + override fun createMessageFlux(messageParameters: MessageParameters): Flux = Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { if (messageParameters.amount < 0) it.repeat() @@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat .build() - private fun createMessage(commonHeader: CommonEventHeader): WireFrame = - WireFrame(vesMessageBytes(commonHeader)) + private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage = + PayloadWireFrameMessage(vesMessageBytes(commonHeader)) private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray = diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 6487888e..2f9e0b59 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -20,13 +20,13 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux) = IO { + fun sendIo(messages: Flux) = IO { sendRx(messages).block() } - fun sendRx(messages: Flux): Mono { + fun sendRx(messages: Flux): Mono { val complete = ReplayProcessor.create(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { return complete.then() } - private fun handler(complete: ReplayProcessor, messages: Flux, nettyOutbound: NettyOutbound): + private fun handler(complete: ReplayProcessor, + messages: Flux, + nettyOutbound: NettyOutbound): Publisher { - val encoder = WireFrameEncoder(nettyOutbound.alloc()) - val context = nettyOutbound.context() - - context.onClose { - logger.info { "Connection to ${context.address()} has been closed" } - } - - // TODO: Close channel after all messages have been sent - // The code bellow doesn't work because it closes the channel earlier and not all are consumed... -// complete.subscribe { -// context.channel().disconnect().addListener { -// if (it.isSuccess) -// logger.info { "Connection closed" } -// else -// logger.warn("Failed to close the connection", it.cause()) -// } -// } - + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) val frames = messages .map(encoder::encode) .window(MAX_BATCH_SIZE) return nettyOutbound + .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(Unpooled.EMPTY_BUFFER)) + .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { .clientAuth(ClientAuth.REQUIRE) .build() + private fun NettyOutbound.logConnectionClosed(): NettyOutbound { + context().onClose { + logger.info { "Connection to ${context().address()} has been closed" } + } + return this + } + companion object { private const val MAX_BATCH_SIZE = 128 + private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE private val logger = Logger(XnfSimulator::class) } } -- cgit 1.2.3-korg