diff options
author | 2018-09-24 10:11:42 +0000 | |
---|---|---|
committer | 2018-09-24 10:11:42 +0000 | |
commit | 7b269674526a267f14895df8b825f3b59b30b98a (patch) | |
tree | 606cdabf7354c0d2a70aa2e0c63a1e36c0a76d1a /hv-collector-xnf-simulator/src/main | |
parent | 8f4a019fb04f0b3c408179ae1ae6daa9b742cdba (diff) | |
parent | e880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 (diff) |
Merge "Remove end-of-transmission message from protocol"
Diffstat (limited to 'hv-collector-xnf-simulator/src/main')
2 files changed, 4 insertions, 8 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 558bd1c1..3fde2c7e 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -23,7 +23,6 @@ import arrow.core.Either import arrow.core.Some import arrow.core.Try import arrow.core.fix -import arrow.core.flatMap import arrow.core.monad import arrow.effects.IO import arrow.typeclasses.binding diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index d1a5296a..af71e9ce 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration @@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux<PayloadWireFrameMessage>) = + fun sendIo(messages: Flux<WireFrameMessage>) = sendRx(messages).then(Mono.just(Unit)).asIo() - private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { + private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<PayloadWireFrameMessage>, + messages: Flux<WireFrameMessage>, nettyOutbound: NettyOutbound): Publisher<Void> { val allocator = nettyOutbound.alloc() @@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { companion object { private val logger = Logger(VesHvClient::class) private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE } } |