diff options
author | Przemyslaw Wasala <przemyslaw.wasala@nokia.com> | 2018-09-24 10:11:42 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-09-24 10:11:42 +0000 |
commit | 7b269674526a267f14895df8b825f3b59b30b98a (patch) | |
tree | 606cdabf7354c0d2a70aa2e0c63a1e36c0a76d1a /hv-collector-xnf-simulator | |
parent | 8f4a019fb04f0b3c408179ae1ae6daa9b742cdba (diff) | |
parent | e880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 (diff) |
Merge "Remove end-of-transmission message from protocol"
Diffstat (limited to 'hv-collector-xnf-simulator')
3 files changed, 6 insertions, 10 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 558bd1c1..3fde2c7e 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -23,7 +23,6 @@ import arrow.core.Either import arrow.core.Some import arrow.core.Try import arrow.core.fix -import arrow.core.flatMap import arrow.core.monad import arrow.effects.IO import arrow.typeclasses.binding diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index d1a5296a..af71e9ce 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration @@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux<PayloadWireFrameMessage>) = + fun sendIo(messages: Flux<WireFrameMessage>) = sendRx(messages).then(Mono.just(Unit)).asIo() - private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { + private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<PayloadWireFrameMessage>, + messages: Flux<WireFrameMessage>, nettyOutbound: NettyOutbound): Publisher<Void> { val allocator = nettyOutbound.alloc() @@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { companion object { private val logger = Logger(VesHvClient::class) private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE } } diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 80f39579..97535887 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -30,7 +30,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteInputStream import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat @@ -98,7 +98,7 @@ internal class XnfSimulatorTest : Spek({ // given val json = "[true]".byteInputStream() val messageParams = listOf<MessageParameters>() - val generatedMessages = Flux.empty<PayloadWireFrameMessage>() + val generatedMessages = Flux.empty<WireFrameMessage>() val sendingIo = IO {} whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) |