diff options
Diffstat (limited to 'hv-collector-xnf-simulator/src')
3 files changed, 6 insertions, 10 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 558bd1c1..3fde2c7e 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -23,7 +23,6 @@ import arrow.core.Either import arrow.core.Some import arrow.core.Try import arrow.core.fix -import arrow.core.flatMap import arrow.core.monad import arrow.effects.IO import arrow.typeclasses.binding diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index d1a5296a..af71e9ce 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration @@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux<PayloadWireFrameMessage>) = + fun sendIo(messages: Flux<WireFrameMessage>) = sendRx(messages).then(Mono.just(Unit)).asIo() - private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> { + private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<PayloadWireFrameMessage>, + messages: Flux<WireFrameMessage>, nettyOutbound: NettyOutbound): Publisher<Void> { val allocator = nettyOutbound.alloc() @@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { companion object { private val logger = Logger(VesHvClient::class) private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE } } diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 80f39579..97535887 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -30,7 +30,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteInputStream import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat @@ -98,7 +98,7 @@ internal class XnfSimulatorTest : Spek({ // given val json = "[true]".byteInputStream() val messageParams = listOf<MessageParameters>() - val generatedMessages = Flux.empty<PayloadWireFrameMessage>() + val generatedMessages = Flux.empty<WireFrameMessage>() val sendingIo = IO {} whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) |