summaryrefslogtreecommitdiffstats
path: root/hv-collector-xnf-simulator
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2018-09-21 10:14:03 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2018-09-24 08:22:29 +0200
commite880cde732b6d5b6a2fd22b2245ba7f6ff4517f3 (patch)
tree256bd77a86bf86fce96979643a9fe5fcc0318aba /hv-collector-xnf-simulator
parent7333951cfec6b79a92b12e70cf679bff2f01825a (diff)
Remove end-of-transmission message from protocol
Also update protobuf files definitions to latest version. Change-Id: I0cd5d2d8deec5c787e2d3948d3d905fa672f9fea Issue-ID: DCAEGEN2-775 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'hv-collector-xnf-simulator')
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt1
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt11
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt4
3 files changed, 6 insertions, 10 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 558bd1c1..3fde2c7e 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -23,7 +23,6 @@ import arrow.core.Either
import arrow.core.Some
import arrow.core.Try
import arrow.core.fix
-import arrow.core.flatMap
import arrow.core.monad
import arrow.effects.IO
import arrow.typeclasses.binding
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index d1a5296a..af71e9ce 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -24,8 +24,7 @@ import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
@@ -53,10 +52,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
}
.build()
- fun sendIo(messages: Flux<PayloadWireFrameMessage>) =
+ fun sendIo(messages: Flux<WireFrameMessage>) =
sendRx(messages).then(Mono.just(Unit)).asIo()
- private fun sendRx(messages: Flux<PayloadWireFrameMessage>): Mono<Void> {
+ private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
.newHandler { _, output -> handler(complete, messages, output) }
@@ -72,7 +71,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
}
private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<PayloadWireFrameMessage>,
+ messages: Flux<WireFrameMessage>,
nettyOutbound: NettyOutbound): Publisher<Void> {
val allocator = nettyOutbound.alloc()
@@ -85,7 +84,6 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
.logConnectionClosed()
.options { it.flushOnBoundary() }
.sendGroups(frames)
- .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt())))
.then {
logger.info("Messages have been sent")
complete.onComplete()
@@ -117,6 +115,5 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
companion object {
private val logger = Logger(VesHvClient::class)
private const val MAX_BATCH_SIZE = 128
- private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE
}
}
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 80f39579..97535887 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -30,7 +30,7 @@ import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
@@ -98,7 +98,7 @@ internal class XnfSimulatorTest : Spek({
// given
val json = "[true]".byteInputStream()
val messageParams = listOf<MessageParameters>()
- val generatedMessages = Flux.empty<PayloadWireFrameMessage>()
+ val generatedMessages = Flux.empty<WireFrameMessage>()
val sendingIo = IO {}
whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)