diff options
Diffstat (limited to 'hv-collector-xnf-simulator')
5 files changed, 27 insertions, 52 deletions
diff --git a/hv-collector-xnf-simulator/sample-request.json b/hv-collector-xnf-simulator/sample-request.json deleted file mode 100644 index ca8bd885..00000000 --- a/hv-collector-xnf-simulator/sample-request.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "commonEventHeader": { - "version": "sample-version", - "domain": 10, - "sequence": 1, - "priority": 1, - "eventId": "sample-event-id", - "eventName": "sample-event-name", - "eventType": "sample-event-type", - "startEpochMicrosec": 120034455, - "lastEpochMicrosec": 120034455, - "nfNamingCode": "sample-nf-naming-code", - "nfcNamingCode": "sample-nfc-naming-code", - "reportingEntityId": "sample-reporting-entity-id", - "reportingEntityName": "sample-reporting-entity-name", - "sourceId": "sample-source-id", - "sourceName": "sample-source-name" - }, - "messagesAmount": 25000 -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt index f4c92fd4..a6d6af84 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/api/MessageGenerator.kt @@ -19,7 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.api -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import reactor.core.publisher.Flux @@ -28,5 +28,5 @@ import reactor.core.publisher.Flux * @since June 2018 */ interface MessageGenerator { - fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> + fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index b67bc644..6346b648 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.dcae.collectors.veshv.utils.logging.Logger import ratpack.exec.Promise @@ -65,7 +65,7 @@ internal class HttpServer(private val vesClient: XnfSimulator) { } } - private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> { + private fun createMessageFlux(ctx: Context): Promise<Flux<PayloadWireFrameMessage>> { return ctx.request.body .map { Json.createReader(it.inputStream).readObject() } .map { extractMessageParameters(it) } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt index 0d28bad0..baff967a 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageGeneratorImpl.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.api.MessageGenerator import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.ves.VesEventV5.VesEvent @@ -35,7 +35,7 @@ import javax.json.JsonObject */ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerator) : MessageGenerator { - override fun createMessageFlux(messageParameters: MessageParameters): Flux<WireFrame> = + override fun createMessageFlux(messageParameters: MessageParameters): Flux<PayloadWireFrameMessage> = Mono.fromCallable { createMessage(messageParameters.commonEventHeader) }.let { if (messageParameters.amount < 0) it.repeat() @@ -62,8 +62,8 @@ internal class MessageGeneratorImpl(private val payloadGenerator: PayloadGenerat .build() - private fun createMessage(commonHeader: CommonEventHeader): WireFrame = - WireFrame(vesMessageBytes(commonHeader)) + private fun createMessage(commonHeader: CommonEventHeader): PayloadWireFrameMessage = + PayloadWireFrameMessage(vesMessageBytes(commonHeader)) private fun vesMessageBytes(commonHeader: CommonEventHeader): ByteArray = diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 6487888e..2f9e0b59 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -20,13 +20,13 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO -import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -52,11 +52,11 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux<WireFrame>) = IO<Unit> { + fun sendIo(messages: Flux<PayloadWireFrameMessage>) = IO<Unit> { sendRx(messages).block() } - fun sendRx(messages: Flux<WireFrame>): Mono<Void> { + fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -71,34 +71,21 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { return complete.then() } - private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound): + private fun handler(complete: ReplayProcessor<Void>, + messages: Flux<PayloadWireFrameMessage>, + nettyOutbound: NettyOutbound): Publisher<Void> { - val encoder = WireFrameEncoder(nettyOutbound.alloc()) - val context = nettyOutbound.context() - - context.onClose { - logger.info { "Connection to ${context.address()} has been closed" } - } - - // TODO: Close channel after all messages have been sent - // The code bellow doesn't work because it closes the channel earlier and not all are consumed... -// complete.subscribe { -// context.channel().disconnect().addListener { -// if (it.isSuccess) -// logger.info { "Connection closed" } -// else -// logger.warn("Failed to close the connection", it.cause()) -// } -// } - + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) val frames = messages .map(encoder::encode) .window(MAX_BATCH_SIZE) return nettyOutbound + .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(Unpooled.EMPTY_BUFFER)) + .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -114,8 +101,16 @@ internal class XnfSimulator(private val configuration: SimulatorConfiguration) { .clientAuth(ClientAuth.REQUIRE) .build() + private fun NettyOutbound.logConnectionClosed(): NettyOutbound { + context().onClose { + logger.info { "Connection to ${context().address()} has been closed" } + } + return this + } + companion object { private const val MAX_BATCH_SIZE = 128 + private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE private val logger = Logger(XnfSimulator::class) } } |