From e880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Fri, 21 Sep 2018 10:14:03 +0200 Subject: Remove end-of-transmission message from protocol Also update protobuf files definitions to latest version. Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea Issue-ID: DCAEGEN2-775 Signed-off-by: Filip Krzywka --- .../dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt | 1 - .../veshv/simulators/xnf/impl/adapters/VesHvClient.kt | 11 ++++------- .../org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt | 4 ++-- 3 files changed, 6 insertions(+), 10 deletions(-) (limited to 'hv-collector-xnf-simulator') diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 558bd1c1..3fde2c7e 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -23,7 +23,6 @@ import arrow.core.Either import arrow.core.Some import arrow.core.Try import arrow.core.fix -import arrow.core.flatMap import arrow.core.monad import arrow.effects.IO import arrow.typeclasses.binding diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index d1a5296a..af71e9ce 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration @@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } .build() - fun sendIo(messages: Flux) = + fun sendIo(messages: Flux) = sendRx(messages).then(Mono.just(Unit)).asIo() - private fun sendRx(messages: Flux): Mono { + private fun sendRx(messages: Flux): Mono { val complete = ReplayProcessor.create(1) client .newHandler { _, output -> handler(complete, messages, output) } @@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { } private fun handler(complete: ReplayProcessor, - messages: Flux, + messages: Flux, nettyOutbound: NettyOutbound): Publisher { val allocator = nettyOutbound.alloc() @@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .logConnectionClosed() .options { it.flushOnBoundary() } .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) .then { logger.info("Messages have been sent") complete.onComplete() @@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { companion object { private val logger = Logger(VesHvClient::class) private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE } } diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 80f39579..97535887 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -30,7 +30,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteInputStream import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat @@ -98,7 +98,7 @@ internal class XnfSimulatorTest : Spek({ // given val json = "[true]".byteInputStream() val messageParams = listOf() - val generatedMessages = Flux.empty() + val generatedMessages = Flux.empty() val sendingIo = IO {} whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) -- cgit 1.2.3-korg